Sry. This was meant for the Flink dev list :( On Tue, Apr 9, 2024, 06:48 Péter Váry <peter.vary.apa...@gmail.com> wrote:
> Forwarding the invite for the discussion we plan to do with the Iceberg > folks, as some of you might be interested in this. > > ---------- Forwarded message --------- > From: Brian Olsen <bitsondata...@gmail.com> > Date: Mon, Apr 8, 2024, 18:29 > Subject: Re: Flink table maintenance > To: <dev@iceberg.apache.org> > > > Hey Iceberg nation, > > I would like to share about the meeting this Wednesday to further discuss > details of Péter's proposal on Flink Maintenance Tasks. > Calendar Link: https://calendar.app.google/83HGYWXoQJ8zXuVCA > > List discussion: > https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl > <https://www.google.com/url?q=https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl&sa=D&source=calendar&usd=2&usg=AOvVaw2-aePIRr6APFVHpRDipMgX> > > Design Doc: Flink table maintenance > <https://www.google.com/url?q=https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp%3Dsharing&sa=D&source=calendar&usd=2&usg=AOvVaw1oLYQP76-G1ZEOW5pTxV1M> > > > > On Mon, Apr 1, 2024 at 8:52 PM Manu Zhang <owenzhang1...@gmail.com> wrote: > >> Hi Peter, >> >> Are you proposing to create a user facing locking feature in Iceberg, or >>> just something something for internal use? >>> >> >> Since it's a general issue, I'm proposing to create a general user >> interface first, while the implementation can be left to users. For >> example, we use Airflow to schedule maintenance jobs and we can check >> in-progress jobs with the Airflow API. Hive metastore lock might be another >> option we can implement for users. >> >> Thanks, >> Manu >> >> On Tue, Apr 2, 2024 at 5:26 AM Péter Váry <peter.vary.apa...@gmail.com> >> wrote: >> >>> Hi Ajantha, >>> >>> I thought about enabling post commit topology based compaction for sinks >>> using options, like we use for the parametrization of streaming reads [1]. >>> I think it will be hard to do it in a user friendly way - because of the >>> high number of parameters -, but I think it is a possible solution with >>> sensible defaults. >>> >>> There is a batch-like solution for data file compaction already >>> available [2], but I do not see how we could extend Flink SQL to be able to >>> call it. >>> >>> Writing to a branch using Flink SQL should be another thread, but by my >>> first guess, it shouldn't be hard to implement using options, like: >>> /*+ OPTIONS('branch'='b1') */ >>> Since writing to branch i already working through the Java API [3]. >>> >>> Thanks, Peter >>> >>> 1 - >>> https://iceberg.apache.org/docs/latest/flink-queries/#flink-streaming-read >>> 2 - >>> https://github.com/apache/iceberg/blob/820fc3ceda386149f42db8b54e6db9171d1a3a6d/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java >>> 3 - >>> https://iceberg.apache.org/docs/latest/flink-writes/#branch-writes >>> >>> On Mon, Apr 1, 2024, 16:30 Ajantha Bhat <ajanthab...@gmail.com> wrote: >>> >>>> Thanks for the proposal Peter. >>>> >>>> I just wanted to know do we have any plans for supporting SQL syntax >>>> for table maintenance (like CALL procedure) for pure Flink SQL users? >>>> I didn't see any custom SQL parser plugin support in Flink. I also saw >>>> that Branch write doesn't have SQL support (only Branch reads use Option), >>>> So I am not sure about the roadmap of Iceberg SQL support in Flink. >>>> Was there any discussion before? >>>> >>>> - Ajantha >>>> >>>> On Mon, Apr 1, 2024 at 7:51 PM Péter Váry <peter.vary.apa...@gmail.com> >>>> wrote: >>>> >>>>> Hi Manu, >>>>> >>>>> Just to clarify: >>>>> - Are you proposing to create a user facing locking feature in >>>>> Iceberg, or just something something for internal use? >>>>> >>>>> I think we shouldn't add locking to Iceberg's user facing scope in >>>>> this stage. A fully featured locking system has many more features that we >>>>> need (priorities, fairness, timeouts etc). I could be tempted when we are >>>>> talking about the REST catalog, but I think that should be further down >>>>> the >>>>> road, if ever... >>>>> >>>>> About using the tags: >>>>> - I whole-heartedly agree that using tags is not intuitive, and I see >>>>> your points in most of your arguments. OTOH, introducing new requirement >>>>> (locking mechanism) seems like a wrong direction to me. >>>>> - We already defined a requirement (atomic changes on the table) for >>>>> the Catalog implementations which could be used to archive our goal here. >>>>> - We also already store technical data in snapshot properties in Flink >>>>> jobs (JobId, OperatorId, CheckpointId). Maybe technical tags/table >>>>> properties is not a big stretch. >>>>> >>>>> Or we can look at these tags or metadata as 'technical data' which is >>>>> internal to Iceberg, and shouldn't expressed on the external API. My >>>>> concern is: >>>>> - Would it be used often enough to worth the additional complexity? >>>>> >>>>> Knowing that Spark compaction is struggling with the same issue is a >>>>> good indicator, but probably we would need more use cases for introducing >>>>> a >>>>> new feature with this complexity, or simpler solution. >>>>> >>>>> Thanks, Peter >>>>> >>>>> >>>>> On Mon, Apr 1, 2024, 10:18 Manu Zhang <owenzhang1...@gmail.com> wrote: >>>>> >>>>>> What would the community think of exploiting tags for preventing >>>>>>> concurrent maintenance loop executions. >>>>>> >>>>>> >>>>>> This issue is not specific to Flink maintenance jobs. We have a >>>>>> service scheduling Spark maintenance jobs by watching table commits. When >>>>>> we don't check in-progress maintenance jobs for the same table, multiple >>>>>> jobs will run concurrently and have conflicts. >>>>>> >>>>>> Basically, I think we need a lock mechanism like the metastore lock >>>>>> <https://iceberg.apache.org/docs/nightly/configuration/#hadoop-configuration> >>>>>> if we want to handle it for users. However, using TAG doesn't look >>>>>> intuitive to me. We are also mixing user data with system metadata. >>>>>> Maybe we can define some general interfaces and leave the >>>>>> implementation to users in the first version. >>>>>> >>>>>> Regards, >>>>>> Manu >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Mar 29, 2024 at 1:59 PM Péter Váry < >>>>>> peter.vary.apa...@gmail.com> wrote: >>>>>> >>>>>>> What would the community think of exploiting tags for preventing >>>>>>> concurrent maintenance loop executions. >>>>>>> >>>>>>> The issue: >>>>>>> Some maintenance tasks couldn't run parallel, like DeleteOrphanFiles >>>>>>> vs. ExpireSnapshots, or RewriteDataFiles vs. RewriteManifestFiles. We >>>>>>> make >>>>>>> sure, not to run tasks started by a single trigger concurrently by >>>>>>> serializing them, but there are no loops in Flink, so we can't >>>>>>> synchronize >>>>>>> tasks started by the next trigger. >>>>>>> >>>>>>> In the document, I describe that we need to rely on the user to >>>>>>> ensure that the rate limit is high enough to prevent concurrent >>>>>>> triggers. >>>>>>> >>>>>>> Proposal: >>>>>>> When firing a trigger, RateLimiter could check and create an Iceberg >>>>>>> table tag [1] for the current table snapshot, with the name: >>>>>>> '__flink_maitenance'. When the execution finishes we remove this tag. >>>>>>> The >>>>>>> RateLimiter keep accumulating changes, and doesn't fire new triggers >>>>>>> until >>>>>>> it finds this tag on the table. >>>>>>> The solution relies on Flink 'forceNonParallel' to prevent >>>>>>> concurrent execution of placing the tag, and on Iceberg to store it. >>>>>>> >>>>>>> This not uses the tags as intended, but seems like a better solution >>>>>>> than adding/removing table properties which would clutter the table >>>>>>> history >>>>>>> with technical data. >>>>>>> >>>>>>> Your thoughts? Any other suggestions/solutions would be welcome. >>>>>>> >>>>>>> Thanks, >>>>>>> Peter >>>>>>> >>>>>>> [1] >>>>>>> https://iceberg.apache.org/docs/latest/java-api-quickstart/#branching-and-tagging >>>>>>> >>>>>>> On Thu, Mar 28, 2024, 14:44 Péter Váry <peter.vary.apa...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Team, >>>>>>>> >>>>>>>> As discussed on yesterday's community sync, I am working on adding >>>>>>>> a possibility to the Flink Iceberg connector to run maintenance tasks >>>>>>>> on >>>>>>>> the Iceberg tables. This will fix the small files issues and in the >>>>>>>> long >>>>>>>> run help compacting the high number of positional and equality deletes >>>>>>>> created by Flink tasks writing CDC data to Iceberg tables without the >>>>>>>> need >>>>>>>> of Spark in the infrastructure. >>>>>>>> >>>>>>>> I did some planning, prototyping and currently trying out the >>>>>>>> solution on a larger scale. >>>>>>>> >>>>>>>> I put together a document how my current solution looks like: >>>>>>>> >>>>>>>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp=sharing >>>>>>>> >>>>>>>> I would love to hear your thoughts and feedback on this to find a >>>>>>>> good final solution. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Peter >>>>>>>> >>>>>>>