Forwarding the invite for the discussion we plan to do with the Iceberg folks, as some of you might be interested in this.
---------- Forwarded message --------- From: Brian Olsen <bitsondata...@gmail.com> Date: Mon, Apr 8, 2024, 18:29 Subject: Re: Flink table maintenance To: <d...@iceberg.apache.org> Hey Iceberg nation, I would like to share about the meeting this Wednesday to further discuss details of Péter's proposal on Flink Maintenance Tasks. Calendar Link: https://calendar.app.google/83HGYWXoQJ8zXuVCA List discussion: https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl <https://www.google.com/url?q=https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl&sa=D&source=calendar&usd=2&usg=AOvVaw2-aePIRr6APFVHpRDipMgX> Design Doc: Flink table maintenance <https://www.google.com/url?q=https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp%3Dsharing&sa=D&source=calendar&usd=2&usg=AOvVaw1oLYQP76-G1ZEOW5pTxV1M> On Mon, Apr 1, 2024 at 8:52 PM Manu Zhang <owenzhang1...@gmail.com> wrote: > Hi Peter, > > Are you proposing to create a user facing locking feature in Iceberg, or >> just something something for internal use? >> > > Since it's a general issue, I'm proposing to create a general user > interface first, while the implementation can be left to users. For > example, we use Airflow to schedule maintenance jobs and we can check > in-progress jobs with the Airflow API. Hive metastore lock might be another > option we can implement for users. > > Thanks, > Manu > > On Tue, Apr 2, 2024 at 5:26 AM Péter Váry <peter.vary.apa...@gmail.com> > wrote: > >> Hi Ajantha, >> >> I thought about enabling post commit topology based compaction for sinks >> using options, like we use for the parametrization of streaming reads [1]. >> I think it will be hard to do it in a user friendly way - because of the >> high number of parameters -, but I think it is a possible solution with >> sensible defaults. >> >> There is a batch-like solution for data file compaction already available >> [2], but I do not see how we could extend Flink SQL to be able to call it. >> >> Writing to a branch using Flink SQL should be another thread, but by my >> first guess, it shouldn't be hard to implement using options, like: >> /*+ OPTIONS('branch'='b1') */ >> Since writing to branch i already working through the Java API [3]. >> >> Thanks, Peter >> >> 1 - >> https://iceberg.apache.org/docs/latest/flink-queries/#flink-streaming-read >> 2 - >> https://github.com/apache/iceberg/blob/820fc3ceda386149f42db8b54e6db9171d1a3a6d/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java >> 3 - >> https://iceberg.apache.org/docs/latest/flink-writes/#branch-writes >> >> On Mon, Apr 1, 2024, 16:30 Ajantha Bhat <ajanthab...@gmail.com> wrote: >> >>> Thanks for the proposal Peter. >>> >>> I just wanted to know do we have any plans for supporting SQL syntax for >>> table maintenance (like CALL procedure) for pure Flink SQL users? >>> I didn't see any custom SQL parser plugin support in Flink. I also saw >>> that Branch write doesn't have SQL support (only Branch reads use Option), >>> So I am not sure about the roadmap of Iceberg SQL support in Flink. >>> Was there any discussion before? >>> >>> - Ajantha >>> >>> On Mon, Apr 1, 2024 at 7:51 PM Péter Váry <peter.vary.apa...@gmail.com> >>> wrote: >>> >>>> Hi Manu, >>>> >>>> Just to clarify: >>>> - Are you proposing to create a user facing locking feature in Iceberg, >>>> or just something something for internal use? >>>> >>>> I think we shouldn't add locking to Iceberg's user facing scope in this >>>> stage. A fully featured locking system has many more features that we need >>>> (priorities, fairness, timeouts etc). I could be tempted when we are >>>> talking about the REST catalog, but I think that should be further down the >>>> road, if ever... >>>> >>>> About using the tags: >>>> - I whole-heartedly agree that using tags is not intuitive, and I see >>>> your points in most of your arguments. OTOH, introducing new requirement >>>> (locking mechanism) seems like a wrong direction to me. >>>> - We already defined a requirement (atomic changes on the table) for >>>> the Catalog implementations which could be used to archive our goal here. >>>> - We also already store technical data in snapshot properties in Flink >>>> jobs (JobId, OperatorId, CheckpointId). Maybe technical tags/table >>>> properties is not a big stretch. >>>> >>>> Or we can look at these tags or metadata as 'technical data' which is >>>> internal to Iceberg, and shouldn't expressed on the external API. My >>>> concern is: >>>> - Would it be used often enough to worth the additional complexity? >>>> >>>> Knowing that Spark compaction is struggling with the same issue is a >>>> good indicator, but probably we would need more use cases for introducing a >>>> new feature with this complexity, or simpler solution. >>>> >>>> Thanks, Peter >>>> >>>> >>>> On Mon, Apr 1, 2024, 10:18 Manu Zhang <owenzhang1...@gmail.com> wrote: >>>> >>>>> What would the community think of exploiting tags for preventing >>>>>> concurrent maintenance loop executions. >>>>> >>>>> >>>>> This issue is not specific to Flink maintenance jobs. We have a >>>>> service scheduling Spark maintenance jobs by watching table commits. When >>>>> we don't check in-progress maintenance jobs for the same table, multiple >>>>> jobs will run concurrently and have conflicts. >>>>> >>>>> Basically, I think we need a lock mechanism like the metastore lock >>>>> <https://iceberg.apache.org/docs/nightly/configuration/#hadoop-configuration> >>>>> if we want to handle it for users. However, using TAG doesn't look >>>>> intuitive to me. We are also mixing user data with system metadata. >>>>> Maybe we can define some general interfaces and leave the >>>>> implementation to users in the first version. >>>>> >>>>> Regards, >>>>> Manu >>>>> >>>>> >>>>> >>>>> On Fri, Mar 29, 2024 at 1:59 PM Péter Váry < >>>>> peter.vary.apa...@gmail.com> wrote: >>>>> >>>>>> What would the community think of exploiting tags for preventing >>>>>> concurrent maintenance loop executions. >>>>>> >>>>>> The issue: >>>>>> Some maintenance tasks couldn't run parallel, like DeleteOrphanFiles >>>>>> vs. ExpireSnapshots, or RewriteDataFiles vs. RewriteManifestFiles. We >>>>>> make >>>>>> sure, not to run tasks started by a single trigger concurrently by >>>>>> serializing them, but there are no loops in Flink, so we can't >>>>>> synchronize >>>>>> tasks started by the next trigger. >>>>>> >>>>>> In the document, I describe that we need to rely on the user to >>>>>> ensure that the rate limit is high enough to prevent concurrent triggers. >>>>>> >>>>>> Proposal: >>>>>> When firing a trigger, RateLimiter could check and create an Iceberg >>>>>> table tag [1] for the current table snapshot, with the name: >>>>>> '__flink_maitenance'. When the execution finishes we remove this tag. The >>>>>> RateLimiter keep accumulating changes, and doesn't fire new triggers >>>>>> until >>>>>> it finds this tag on the table. >>>>>> The solution relies on Flink 'forceNonParallel' to prevent concurrent >>>>>> execution of placing the tag, and on Iceberg to store it. >>>>>> >>>>>> This not uses the tags as intended, but seems like a better solution >>>>>> than adding/removing table properties which would clutter the table >>>>>> history >>>>>> with technical data. >>>>>> >>>>>> Your thoughts? Any other suggestions/solutions would be welcome. >>>>>> >>>>>> Thanks, >>>>>> Peter >>>>>> >>>>>> [1] >>>>>> https://iceberg.apache.org/docs/latest/java-api-quickstart/#branching-and-tagging >>>>>> >>>>>> On Thu, Mar 28, 2024, 14:44 Péter Váry <peter.vary.apa...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Team, >>>>>>> >>>>>>> As discussed on yesterday's community sync, I am working on adding a >>>>>>> possibility to the Flink Iceberg connector to run maintenance tasks on >>>>>>> the >>>>>>> Iceberg tables. This will fix the small files issues and in the long run >>>>>>> help compacting the high number of positional and equality deletes >>>>>>> created >>>>>>> by Flink tasks writing CDC data to Iceberg tables without the need of >>>>>>> Spark >>>>>>> in the infrastructure. >>>>>>> >>>>>>> I did some planning, prototyping and currently trying out the >>>>>>> solution on a larger scale. >>>>>>> >>>>>>> I put together a document how my current solution looks like: >>>>>>> >>>>>>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp=sharing >>>>>>> >>>>>>> I would love to hear your thoughts and feedback on this to find a >>>>>>> good final solution. >>>>>>> >>>>>>> Thanks, >>>>>>> Peter >>>>>>> >>>>>>