Hi Team, The discussion continued on the previous thread by Gen Luo and Zhu Zhu. See: https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl. Adding them to this thread too, so we can continue the discussion in one place. Based on their thoughts there, the current suggestion is:
Q: Do you have concerns with the Maintenance Tasks execution infrastructure? A: Topology: - We should prioritize the Monitor based solution as it is more general, and better at separating resource usage from the main writer jobs - PostCommitTopology based integration has its own uses, but might have a lower priority Checkpointing: - We should consider AsyncIO as much as possible to prevent blocking of checkpoint creation - Because of the current limitations of AsyncIO, unaligned checkpoints could be useful when the task size grows Q: Do you have use-cases for other Maintenance Task(s) than mentioned in the doc? A: Data file rewrite (Global/Incremental), Expire snapshots, Manifest files rewrite, Delete orphan files - no new tasks come up Q: Do you think more scheduling information is needed than mentioned in the doc? A: Commit number, New data file number, New data file size, New delete file number, Elapsed time since the last run - no new scheduling information come up Q: How to prevent concurrent Maintenance Tasks? A: Discussed different possible solutions, but the best bet is still the tag based locking I have yet to hear back from most of you on Flink table maintenance. If you have time, please share your thoughts. If there are no more concerns, I would love to close this part of the design and move forward with the implementation. Thanks, Peter Brian Olsen <bitsondata...@gmail.com> ezt írta (időpont: 2024. ápr. 12., P, 7:58): > Hey everyone, > > Following up from this meeting, we decided due to the distributed nature > of everyone involved on Flink, it would be best to achieve consensus on > some critical points that Péter outlined for us. > > > - Do you have concerns with the Maintenance Tasks execution > infrastructure? The current plan is: > > > - Streaming tasks to continuously execute the Maintenance Tasks > - SinkV2 PostCommitTopology or Monitor service to collect the changes > - Scheduler to decide which Maintenance Task(s) to run > - Serialized Maintenance Task execution > > > - Do you have use-cases for other Maintenance Task(s) than mentioned > in the doc? Do you have different priorities? > > > - Data file rewrite > > > - Global for rewriting existing and new files - equality/positional > delete removal, repartitioning etc > - Incremental for rewriting only new files - no metadata read needed > > > - Expire snapshots > - Manifest files rewrite > - Delete orphan files > - Positional delete files - probably not needed for Flink > > > - Do you think more scheduling information is need than mentioned in > the doc? > > > - Commit number > - New data file number > - New data file size > - New delete file number > - Elapsed time since the last run > > > - How to prevent concurrent Maintenance Tasks? > > > - Just run, and retry - will clutter the logs with failures, and waste > resources > - External locking > - Using Iceberg table tags to prevent concurrent runs - mixing user > data with technical data > - We need a better solution [image: :smile:] > > > > I will be following up with these members if I don't see them reply in the > following week. If you have no comment and you are on this list, please > reply 'looks good' to acknowledge you have seen it. Anyone is free to chime > in, we just wanted a list of required eyes before assuming consensus. > > Péter > Ryan Blue > Daniel C. Weeks > Russell Spitzer > Steven Wu > Bryan Keller > Manu Zhang > Ajantha Bhat > Yanghao Lin > Thanks everyone!! > > On Thu, Apr 11, 2024 at 7:13 AM wenjin <wenjin...@gmail.com> wrote: > >> Hi Peter, >> >> I am interested in your proposal and have clarified some confusions with >> your help in Flink community, thanks for your answers. >> >> I participated in yesterday’s discussion, but since I am not a native >> English speaker, I am concerned that I may have missed some details >> regarding the follow actions about this proposal. So, if possible, please >> involve me when there are further discuss or updates. >> >> Thanks, >> wenjin >> >> On 2024/04/09 04:48:48 Péter Váry wrote: >> > Forwarding the invite for the discussion we plan to do with the Iceberg >> > folks, as some of you might be interested in this. >> > >> > ---------- Forwarded message --------- >> > From: Brian Olsen <bi...@gmail.com> >> > Date: Mon, Apr 8, 2024, 18:29 >> > Subject: Re: Flink table maintenance >> > To: <de...@iceberg.apache.org> >> > >> > >> > Hey Iceberg nation, >> > >> > I would like to share about the meeting this Wednesday to further >> discuss >> > details of Péter's proposal on Flink Maintenance Tasks. >> > Calendar Link: https://calendar.app.google/83HGYWXoQJ8zXuVCA >> > >> > List discussion: >> > https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl >> > < >> https://www.google.com/url?q=https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl&sa=D&source=calendar&usd=2&usg=AOvVaw2-aePIRr6APFVHpRDipMgX >> > >> > >> > Design Doc: Flink table maintenance >> > < >> https://www.google.com/url?q=https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp%3Dsharing&sa=D&source=calendar&usd=2&usg=AOvVaw1oLYQP76-G1ZEOW5pTxV1M >> > >> > >> > >> > >> > On Mon, Apr 1, 2024 at 8:52 PM Manu Zhang <ow...@gmail.com> wrote: >> > >> > > Hi Peter, >> > > >> > > Are you proposing to create a user facing locking feature in Iceberg, >> or >> > >> just something something for internal use? >> > >> >> > > >> > > Since it's a general issue, I'm proposing to create a general user >> > > interface first, while the implementation can be left to users. For >> > > example, we use Airflow to schedule maintenance jobs and we can check >> > > in-progress jobs with the Airflow API. Hive metastore lock might be >> another >> > > option we can implement for users. >> > > >> > > Thanks, >> > > Manu >> > > >> > > On Tue, Apr 2, 2024 at 5:26 AM Péter Váry <pe...@gmail.com> >> > > wrote: >> > > >> > >> Hi Ajantha, >> > >> >> > >> I thought about enabling post commit topology based compaction for >> sinks >> > >> using options, like we use for the parametrization of streaming >> reads [1]. >> > >> I think it will be hard to do it in a user friendly way - because of >> the >> > >> high number of parameters -, but I think it is a possible solution >> with >> > >> sensible defaults. >> > >> >> > >> There is a batch-like solution for data file compaction already >> available >> > >> [2], but I do not see how we could extend Flink SQL to be able to >> call it. >> > >> >> > >> Writing to a branch using Flink SQL should be another thread, but by >> my >> > >> first guess, it shouldn't be hard to implement using options, like: >> > >> /*+ OPTIONS('branch'='b1') */ >> > >> Since writing to branch i already working through the Java API [3]. >> > >> >> > >> Thanks, Peter >> > >> >> > >> 1 - >> > >> >> https://iceberg.apache.org/docs/latest/flink-queries/#flink-streaming-read >> > >> 2 - >> > >> >> https://github.com/apache/iceberg/blob/820fc3ceda386149f42db8b54e6db9171d1a3a6d/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java >> > >> 3 - >> > >> https://iceberg.apache.org/docs/latest/flink-writes/#branch-writes >> > >> >> > >> On Mon, Apr 1, 2024, 16:30 Ajantha Bhat <aj...@gmail.com> wrote: >> > >> >> > >>> Thanks for the proposal Peter. >> > >>> >> > >>> I just wanted to know do we have any plans for supporting SQL >> syntax for >> > >>> table maintenance (like CALL procedure) for pure Flink SQL users? >> > >>> I didn't see any custom SQL parser plugin support in Flink. I also >> saw >> > >>> that Branch write doesn't have SQL support (only Branch reads use >> Option), >> > >>> So I am not sure about the roadmap of Iceberg SQL support in Flink. >> > >>> Was there any discussion before? >> > >>> >> > >>> - Ajantha >> > >>> >> > >>> On Mon, Apr 1, 2024 at 7:51 PM Péter Váry <pe...@gmail.com> >> > >>> wrote: >> > >>> >> > >>>> Hi Manu, >> > >>>> >> > >>>> Just to clarify: >> > >>>> - Are you proposing to create a user facing locking feature in >> Iceberg, >> > >>>> or just something something for internal use? >> > >>>> >> > >>>> I think we shouldn't add locking to Iceberg's user facing scope in >> this >> > >>>> stage. A fully featured locking system has many more features that >> we need >> > >>>> (priorities, fairness, timeouts etc). I could be tempted when we >> are >> > >>>> talking about the REST catalog, but I think that should be further >> down the >> > >>>> road, if ever... >> > >>>> >> > >>>> About using the tags: >> > >>>> - I whole-heartedly agree that using tags is not intuitive, and I >> see >> > >>>> your points in most of your arguments. OTOH, introducing new >> requirement >> > >>>> (locking mechanism) seems like a wrong direction to me. >> > >>>> - We already defined a requirement (atomic changes on the table) >> for >> > >>>> the Catalog implementations which could be used to archive our >> goal here. >> > >>>> - We also already store technical data in snapshot properties in >> Flink >> > >>>> jobs (JobId, OperatorId, CheckpointId). Maybe technical tags/table >> > >>>> properties is not a big stretch. >> > >>>> >> > >>>> Or we can look at these tags or metadata as 'technical data' which >> is >> > >>>> internal to Iceberg, and shouldn't expressed on the external API. >> My >> > >>>> concern is: >> > >>>> - Would it be used often enough to worth the additional complexity? >> > >>>> >> > >>>> Knowing that Spark compaction is struggling with the same issue is >> a >> > >>>> good indicator, but probably we would need more use cases for >> introducing a >> > >>>> new feature with this complexity, or simpler solution. >> > >>>> >> > >>>> Thanks, Peter >> > >>>> >> > >>>> >> > >>>> On Mon, Apr 1, 2024, 10:18 Manu Zhang <ow...@gmail.com> wrote: >> > >>>> >> > >>>>> What would the community think of exploiting tags for preventing >> > >>>>>> concurrent maintenance loop executions. >> > >>>>> >> > >>>>> >> > >>>>> This issue is not specific to Flink maintenance jobs. We have a >> > >>>>> service scheduling Spark maintenance jobs by watching table >> commits. When >> > >>>>> we don't check in-progress maintenance jobs for the same table, >> multiple >> > >>>>> jobs will run concurrently and have conflicts. >> > >>>>> >> > >>>>> Basically, I think we need a lock mechanism like the metastore >> lock >> > >>>>> < >> https://iceberg.apache.org/docs/nightly/configuration/#hadoop-configuration >> > >> > >>>>> if we want to handle it for users. However, using TAG doesn't look >> > >>>>> intuitive to me. We are also mixing user data with system >> metadata. >> > >>>>> Maybe we can define some general interfaces and leave the >> > >>>>> implementation to users in the first version. >> > >>>>> >> > >>>>> Regards, >> > >>>>> Manu >> > >>>>> >> > >>>>> >> > >>>>> >> > >>>>> On Fri, Mar 29, 2024 at 1:59 PM Péter Váry < >> > >>>>> peter.vary.apa...@gmail.com> wrote: >> > >>>>> >> > >>>>>> What would the community think of exploiting tags for preventing >> > >>>>>> concurrent maintenance loop executions. >> > >>>>>> >> > >>>>>> The issue: >> > >>>>>> Some maintenance tasks couldn't run parallel, like >> DeleteOrphanFiles >> > >>>>>> vs. ExpireSnapshots, or RewriteDataFiles vs. >> RewriteManifestFiles. We make >> > >>>>>> sure, not to run tasks started by a single trigger concurrently >> by >> > >>>>>> serializing them, but there are no loops in Flink, so we can't >> synchronize >> > >>>>>> tasks started by the next trigger. >> > >>>>>> >> > >>>>>> In the document, I describe that we need to rely on the user to >> > >>>>>> ensure that the rate limit is high enough to prevent concurrent >> triggers. >> > >>>>>> >> > >>>>>> Proposal: >> > >>>>>> When firing a trigger, RateLimiter could check and create an >> Iceberg >> > >>>>>> table tag [1] for the current table snapshot, with the name: >> > >>>>>> '__flink_maitenance'. When the execution finishes we remove this >> tag. The >> > >>>>>> RateLimiter keep accumulating changes, and doesn't fire new >> triggers until >> > >>>>>> it finds this tag on the table. >> > >>>>>> The solution relies on Flink 'forceNonParallel' to prevent >> concurrent >> > >>>>>> execution of placing the tag, and on Iceberg to store it. >> > >>>>>> >> > >>>>>> This not uses the tags as intended, but seems like a better >> solution >> > >>>>>> than adding/removing table properties which would clutter the >> table history >> > >>>>>> with technical data. >> > >>>>>> >> > >>>>>> Your thoughts? Any other suggestions/solutions would be welcome. >> > >>>>>> >> > >>>>>> Thanks, >> > >>>>>> Peter >> > >>>>>> >> > >>>>>> [1] >> > >>>>>> >> https://iceberg.apache.org/docs/latest/java-api-quickstart/#branching-and-tagging >> > >>>>>> >> > >>>>>> On Thu, Mar 28, 2024, 14:44 Péter Váry <pe...@gmail.com> >> > >>>>>> wrote: >> > >>>>>> >> > >>>>>>> Hi Team, >> > >>>>>>> >> > >>>>>>> As discussed on yesterday's community sync, I am working on >> adding a >> > >>>>>>> possibility to the Flink Iceberg connector to run maintenance >> tasks on the >> > >>>>>>> Iceberg tables. This will fix the small files issues and in the >> long run >> > >>>>>>> help compacting the high number of positional and equality >> deletes created >> > >>>>>>> by Flink tasks writing CDC data to Iceberg tables without the >> need of Spark >> > >>>>>>> in the infrastructure. >> > >>>>>>> >> > >>>>>>> I did some planning, prototyping and currently trying out the >> > >>>>>>> solution on a larger scale. >> > >>>>>>> >> > >>>>>>> I put together a document how my current solution looks like: >> > >>>>>>> >> > >>>>>>> >> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp=sharing >> > >>>>>>> >> > >>>>>>> I would love to hear your thoughts and feedback on this to find >> a >> > >>>>>>> good final solution. >> > >>>>>>> >> > >>>>>>> Thanks, >> > >>>>>>> Peter >> > >>>>>>> >> > >>>>>> >> > > >