Thanks Jungtaek for starting this discussion. What our team wants to do is data ingest into the Iceberg table with one minute frequency. This frequency can also lead to a large number of small files.
Auto compaction(rewrites manifest and data files) in the streaming sink(writer) looks wonderful. As you said, we need to also consider auto `expireSnapshots` and `write.metadata.delete-after-commit.enabled`. Ideally, we provide a streaming writer, giving two config options: - auto-compact = true ((like Databricks auto compaction [1], after a successful commit, the selected files (including manifest and datafile) need to be merged for a reasonable compaction.) - snapshot-time-to-live = 10 day (limit how far back you can time travel) I feel that this is very useful for users and can really bring iceberg into the field of quasi real-time. [1] https://docs.databricks.com/delta/optimizations/auto-optimize.html#auto-compaction Best, Jingsong On Wed, Jul 29, 2020 at 3:27 AM Ryan Blue <rb...@netflix.com.invalid> wrote: > Thanks for the PR! We will review it soon. > > The RewriteManifestAction is a parallel way to rewrite manfiests and then > commit them using the RewriteManifests table operation. The idea here is > that the metadata tree (manifest list and manifest files) functions as an > index over data in the table. Sometimes, you want to rewrite that index to > make finding data easier. > > When data is appended, the new data files are placed into a new manifest > that is added to the manifest list file. Manifests are also compacted when > there are enough of them or a group reaches a certain size. Both of these > operations tend to cluster data in manifests by the time that it was added > to the table. This is great for time-series data because you get manifests > that each have a few days or a week of data, and a time filter can find the > right manifests and data files quickly. But if you want to look up data by > a dimension other than time -- for example, using the bucket of an ID -- > then the natural clustering doesn't work well. In that case, you can use > RewriteManifests or the RewriteManifestsAction to cluster data files by > some key. That really helps speed up queries for specific rows. > > On Mon, Jul 27, 2020 at 8:41 PM Jungtaek Lim <kabhwan.opensou...@gmail.com> > wrote: > >> I'd love to contribute documentation about the actions - just need some >> time to understand the needs for some actions (like RewriteManifestAction). >> >> I just submitted a PR for structured streaming sink [1]. I mentioned >> expireSnapshot() there with linking javadoc page, but it'd be nice if >> there's also a code example in the API page, as it'd not be bound for >> Spark's case (any fast changing cases including Flink streaming would >> need this as well). >> >> 1. https://github.com/apache/iceberg/pull/1261 >> >> On Tue, Jul 28, 2020 at 10:39 AM Ryan Blue <rb...@netflix.com> wrote: >> >>> > seems to fail on high rate writing streaming query being run on the >>> other side >>> >>> This kind of situation is where you'd want to tune the number of retries >>> for a table. That's a likely source of the problem. We can also check to >>> make sure we're being smart about conflict detection. A rewrite needs to >>> scan any manifests that might have data files that conflict, which is why >>> retries can take a little while. The farther the rewrite is from active >>> data partitions, the better. And we can double-check to make sure we're >>> using the manifest file partition ranges to avoid doing unnecessary work. >>> >>> > from the end user's point of view, all actions are not documented >>> >>> Yes, we need to add documentation for the actions. If you're interested, >>> feel free to open PRs! The actions are fairly new, so we don't yet have >>> docs for them. >>> >>> Same with the streaming sink, we just need someone to write up docs and >>> contribute them. We don't use the streaming sink, so I've unfortunately >>> overlooked it. >>> >>> On Mon, Jul 27, 2020 at 3:25 PM Jungtaek Lim < >>> kabhwan.opensou...@gmail.com> wrote: >>> >>>> Thanks for the quick response! >>>> >>>> And yes I also went through experimenting expireSnapshots() and it >>>> looked good. I can imagine some alternative conditions on expiring >>>> snapshots (like adjusting "granularity" between snapshots instead of >>>> removing all snapshots before the specific timestamp), but for now it's >>>> just an idea and I don't have support for real world needs. >>>> >>>> I also went through RewriteDataFilesAction and looked good as well. >>>> There's an existing Github issue to make the action be more intelligent >>>> which is valid and good to add. One thing I indicated is that it's a bit >>>> time-consuming task (expected for sure, not a problem) and seems to fail on >>>> high rate writing streaming query being run on the other side (this is a >>>> concern). I guess the action is only related to the old snapshots, hence no >>>> conflict is expected against fast append. It would be nice to know whether >>>> it's an expected behavior and it's recommended to stop all writes before >>>> running the action, or sounds like a bug. >>>> >>>> I haven't gone through RewriteManifestAction, though for now I am only >>>> curious about the needs. I'm eager to experiment with a streaming source >>>> which is in review - don't know about the details of Iceberg so >>>> not qualified to participate reviewing. I'd rather play with it when >>>> available and use it as a chance to learn about Iceberg itself. >>>> >>>> Btw, from the end user's point of view, all actions are not documented >>>> - even structured streaming sink is not documented and I had to go over the >>>> code. While I think it's obvious to document a streaming sink on Spark doc >>>> (wondering why it missed the documentation), would we want to document for >>>> actions as well? Looks like these actions are still evolving so wondering >>>> whether we are waiting for stabilizing, or just missed documentation. >>>> >>>> Thanks, >>>> Jungtaek Lim >>>> >>>> On Tue, Jul 28, 2020 at 2:45 AM Ryan Blue <rb...@netflix.com.invalid> >>>> wrote: >>>> >>>>> Hi Jungtaek, >>>>> >>>>> That setting controls whether Iceberg cleans up old copies of the >>>>> table metadata file. The metadata file holds references to all of the >>>>> table's snapshots (that have no expired) and is self-contained. No >>>>> operations need to access previous metadata files. >>>>> >>>>> Those aren't typically that large, but could be when streaming data >>>>> because you create a lot of versions. For streaming, I'd recommend turning >>>>> it on and making sure you're running `expireSnapshots()` regularly to >>>>> prune >>>>> old table versions -- although expiring snapshots will remove them from >>>>> table metadata and limit how far back you can time travel. >>>>> >>>>> On Mon, Jul 27, 2020 at 4:33 AM Jungtaek Lim < >>>>> kabhwan.opensou...@gmail.com> wrote: >>>>> >>>>>> Hi devs, >>>>>> >>>>>> I'm experimenting with Apache Iceberg for Structured Streaming sink - >>>>>> plan to experiment with source as well, but I see PR still in review. >>>>>> >>>>>> It seems that "fast append" pretty much helps to retain reasonable >>>>>> latency for committing, though the metadata directory grows too fast. I >>>>>> found the option 'write.metadata.delete-after-commit.enabled' (false by >>>>>> default), and disabled it, and the overall size looks fine afterwards. >>>>>> >>>>>> That said, given the option is false by default, I'm wondering which >>>>>> would be impacted when turning off this option. My understanding is that >>>>>> it >>>>>> doesn't affect time-travel (as it refers to a snapshot), and restoring is >>>>>> also from snapshot, so not sure which point to consider when turning on >>>>>> the >>>>>> option. >>>>>> >>>>>> Thanks, >>>>>> Jungtaek Lim >>>>>> >>>>> >>>>> >>>>> -- >>>>> Ryan Blue >>>>> Software Engineer >>>>> Netflix >>>>> >>>> >>> >>> -- >>> Ryan Blue >>> Software Engineer >>> Netflix >>> >> > > -- > Ryan Blue > Software Engineer > Netflix > -- Best, Jingsong Lee