Hey everyone, Following up from this meeting, we decided due to the distributed nature of everyone involved on Flink, it would be best to achieve consensus on some critical points that Péter outlined for us.
- Do you have concerns with the Maintenance Tasks execution infrastructure? The current plan is: - Streaming tasks to continuously execute the Maintenance Tasks - SinkV2 PostCommitTopology or Monitor service to collect the changes - Scheduler to decide which Maintenance Task(s) to run - Serialized Maintenance Task execution - Do you have use-cases for other Maintenance Task(s) than mentioned in the doc? Do you have different priorities? - Data file rewrite - Global for rewriting existing and new files - equality/positional delete removal, repartitioning etc - Incremental for rewriting only new files - no metadata read needed - Expire snapshots - Manifest files rewrite - Delete orphan files - Positional delete files - probably not needed for Flink - Do you think more scheduling information is need than mentioned in the doc? - Commit number - New data file number - New data file size - New delete file number - Elapsed time since the last run - How to prevent concurrent Maintenance Tasks? - Just run, and retry - will clutter the logs with failures, and waste resources - External locking - Using Iceberg table tags to prevent concurrent runs - mixing user data with technical data - We need a better solution [image: :smile:] I will be following up with these members if I don't see them reply in the following week. If you have no comment and you are on this list, please reply 'looks good' to acknowledge you have seen it. Anyone is free to chime in, we just wanted a list of required eyes before assuming consensus. Péter Ryan Blue Daniel C. Weeks Russell Spitzer Steven Wu Bryan Keller Manu Zhang Ajantha Bhat Yanghao Lin Thanks everyone!! On Thu, Apr 11, 2024 at 7:13 AM wenjin <wenjin...@gmail.com> wrote: > Hi Peter, > > I am interested in your proposal and have clarified some confusions with > your help in Flink community, thanks for your answers. > > I participated in yesterday’s discussion, but since I am not a native > English speaker, I am concerned that I may have missed some details > regarding the follow actions about this proposal. So, if possible, please > involve me when there are further discuss or updates. > > Thanks, > wenjin > > On 2024/04/09 04:48:48 Péter Váry wrote: > > Forwarding the invite for the discussion we plan to do with the Iceberg > > folks, as some of you might be interested in this. > > > > ---------- Forwarded message --------- > > From: Brian Olsen <bi...@gmail.com> > > Date: Mon, Apr 8, 2024, 18:29 > > Subject: Re: Flink table maintenance > > To: <de...@iceberg.apache.org> > > > > > > Hey Iceberg nation, > > > > I would like to share about the meeting this Wednesday to further discuss > > details of Péter's proposal on Flink Maintenance Tasks. > > Calendar Link: https://calendar.app.google/83HGYWXoQJ8zXuVCA > > > > List discussion: > > https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl > > < > https://www.google.com/url?q=https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl&sa=D&source=calendar&usd=2&usg=AOvVaw2-aePIRr6APFVHpRDipMgX > > > > > > Design Doc: Flink table maintenance > > < > https://www.google.com/url?q=https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp%3Dsharing&sa=D&source=calendar&usd=2&usg=AOvVaw1oLYQP76-G1ZEOW5pTxV1M > > > > > > > > > > On Mon, Apr 1, 2024 at 8:52 PM Manu Zhang <ow...@gmail.com> wrote: > > > > > Hi Peter, > > > > > > Are you proposing to create a user facing locking feature in Iceberg, > or > > >> just something something for internal use? > > >> > > > > > > Since it's a general issue, I'm proposing to create a general user > > > interface first, while the implementation can be left to users. For > > > example, we use Airflow to schedule maintenance jobs and we can check > > > in-progress jobs with the Airflow API. Hive metastore lock might be > another > > > option we can implement for users. > > > > > > Thanks, > > > Manu > > > > > > On Tue, Apr 2, 2024 at 5:26 AM Péter Váry <pe...@gmail.com> > > > wrote: > > > > > >> Hi Ajantha, > > >> > > >> I thought about enabling post commit topology based compaction for > sinks > > >> using options, like we use for the parametrization of streaming reads > [1]. > > >> I think it will be hard to do it in a user friendly way - because of > the > > >> high number of parameters -, but I think it is a possible solution > with > > >> sensible defaults. > > >> > > >> There is a batch-like solution for data file compaction already > available > > >> [2], but I do not see how we could extend Flink SQL to be able to > call it. > > >> > > >> Writing to a branch using Flink SQL should be another thread, but by > my > > >> first guess, it shouldn't be hard to implement using options, like: > > >> /*+ OPTIONS('branch'='b1') */ > > >> Since writing to branch i already working through the Java API [3]. > > >> > > >> Thanks, Peter > > >> > > >> 1 - > > >> > https://iceberg.apache.org/docs/latest/flink-queries/#flink-streaming-read > > >> 2 - > > >> > https://github.com/apache/iceberg/blob/820fc3ceda386149f42db8b54e6db9171d1a3a6d/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java > > >> 3 - > > >> https://iceberg.apache.org/docs/latest/flink-writes/#branch-writes > > >> > > >> On Mon, Apr 1, 2024, 16:30 Ajantha Bhat <aj...@gmail.com> wrote: > > >> > > >>> Thanks for the proposal Peter. > > >>> > > >>> I just wanted to know do we have any plans for supporting SQL syntax > for > > >>> table maintenance (like CALL procedure) for pure Flink SQL users? > > >>> I didn't see any custom SQL parser plugin support in Flink. I also > saw > > >>> that Branch write doesn't have SQL support (only Branch reads use > Option), > > >>> So I am not sure about the roadmap of Iceberg SQL support in Flink. > > >>> Was there any discussion before? > > >>> > > >>> - Ajantha > > >>> > > >>> On Mon, Apr 1, 2024 at 7:51 PM Péter Váry <pe...@gmail.com> > > >>> wrote: > > >>> > > >>>> Hi Manu, > > >>>> > > >>>> Just to clarify: > > >>>> - Are you proposing to create a user facing locking feature in > Iceberg, > > >>>> or just something something for internal use? > > >>>> > > >>>> I think we shouldn't add locking to Iceberg's user facing scope in > this > > >>>> stage. A fully featured locking system has many more features that > we need > > >>>> (priorities, fairness, timeouts etc). I could be tempted when we are > > >>>> talking about the REST catalog, but I think that should be further > down the > > >>>> road, if ever... > > >>>> > > >>>> About using the tags: > > >>>> - I whole-heartedly agree that using tags is not intuitive, and I > see > > >>>> your points in most of your arguments. OTOH, introducing new > requirement > > >>>> (locking mechanism) seems like a wrong direction to me. > > >>>> - We already defined a requirement (atomic changes on the table) for > > >>>> the Catalog implementations which could be used to archive our goal > here. > > >>>> - We also already store technical data in snapshot properties in > Flink > > >>>> jobs (JobId, OperatorId, CheckpointId). Maybe technical tags/table > > >>>> properties is not a big stretch. > > >>>> > > >>>> Or we can look at these tags or metadata as 'technical data' which > is > > >>>> internal to Iceberg, and shouldn't expressed on the external API. My > > >>>> concern is: > > >>>> - Would it be used often enough to worth the additional complexity? > > >>>> > > >>>> Knowing that Spark compaction is struggling with the same issue is a > > >>>> good indicator, but probably we would need more use cases for > introducing a > > >>>> new feature with this complexity, or simpler solution. > > >>>> > > >>>> Thanks, Peter > > >>>> > > >>>> > > >>>> On Mon, Apr 1, 2024, 10:18 Manu Zhang <ow...@gmail.com> wrote: > > >>>> > > >>>>> What would the community think of exploiting tags for preventing > > >>>>>> concurrent maintenance loop executions. > > >>>>> > > >>>>> > > >>>>> This issue is not specific to Flink maintenance jobs. We have a > > >>>>> service scheduling Spark maintenance jobs by watching table > commits. When > > >>>>> we don't check in-progress maintenance jobs for the same table, > multiple > > >>>>> jobs will run concurrently and have conflicts. > > >>>>> > > >>>>> Basically, I think we need a lock mechanism like the metastore lock > > >>>>> < > https://iceberg.apache.org/docs/nightly/configuration/#hadoop-configuration > > > > >>>>> if we want to handle it for users. However, using TAG doesn't look > > >>>>> intuitive to me. We are also mixing user data with system metadata. > > >>>>> Maybe we can define some general interfaces and leave the > > >>>>> implementation to users in the first version. > > >>>>> > > >>>>> Regards, > > >>>>> Manu > > >>>>> > > >>>>> > > >>>>> > > >>>>> On Fri, Mar 29, 2024 at 1:59 PM Péter Váry < > > >>>>> peter.vary.apa...@gmail.com> wrote: > > >>>>> > > >>>>>> What would the community think of exploiting tags for preventing > > >>>>>> concurrent maintenance loop executions. > > >>>>>> > > >>>>>> The issue: > > >>>>>> Some maintenance tasks couldn't run parallel, like > DeleteOrphanFiles > > >>>>>> vs. ExpireSnapshots, or RewriteDataFiles vs. > RewriteManifestFiles. We make > > >>>>>> sure, not to run tasks started by a single trigger concurrently by > > >>>>>> serializing them, but there are no loops in Flink, so we can't > synchronize > > >>>>>> tasks started by the next trigger. > > >>>>>> > > >>>>>> In the document, I describe that we need to rely on the user to > > >>>>>> ensure that the rate limit is high enough to prevent concurrent > triggers. > > >>>>>> > > >>>>>> Proposal: > > >>>>>> When firing a trigger, RateLimiter could check and create an > Iceberg > > >>>>>> table tag [1] for the current table snapshot, with the name: > > >>>>>> '__flink_maitenance'. When the execution finishes we remove this > tag. The > > >>>>>> RateLimiter keep accumulating changes, and doesn't fire new > triggers until > > >>>>>> it finds this tag on the table. > > >>>>>> The solution relies on Flink 'forceNonParallel' to prevent > concurrent > > >>>>>> execution of placing the tag, and on Iceberg to store it. > > >>>>>> > > >>>>>> This not uses the tags as intended, but seems like a better > solution > > >>>>>> than adding/removing table properties which would clutter the > table history > > >>>>>> with technical data. > > >>>>>> > > >>>>>> Your thoughts? Any other suggestions/solutions would be welcome. > > >>>>>> > > >>>>>> Thanks, > > >>>>>> Peter > > >>>>>> > > >>>>>> [1] > > >>>>>> > https://iceberg.apache.org/docs/latest/java-api-quickstart/#branching-and-tagging > > >>>>>> > > >>>>>> On Thu, Mar 28, 2024, 14:44 Péter Váry <pe...@gmail.com> > > >>>>>> wrote: > > >>>>>> > > >>>>>>> Hi Team, > > >>>>>>> > > >>>>>>> As discussed on yesterday's community sync, I am working on > adding a > > >>>>>>> possibility to the Flink Iceberg connector to run maintenance > tasks on the > > >>>>>>> Iceberg tables. This will fix the small files issues and in the > long run > > >>>>>>> help compacting the high number of positional and equality > deletes created > > >>>>>>> by Flink tasks writing CDC data to Iceberg tables without the > need of Spark > > >>>>>>> in the infrastructure. > > >>>>>>> > > >>>>>>> I did some planning, prototyping and currently trying out the > > >>>>>>> solution on a larger scale. > > >>>>>>> > > >>>>>>> I put together a document how my current solution looks like: > > >>>>>>> > > >>>>>>> > https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp=sharing > > >>>>>>> > > >>>>>>> I would love to hear your thoughts and feedback on this to find a > > >>>>>>> good final solution. > > >>>>>>> > > >>>>>>> Thanks, > > >>>>>>> Peter > > >>>>>>> > > >>>>>> > >