Hi guys, thanks for raising this wonderful thread, and thanks to @ peter.vary.apa...@gmail.com, the doc really makes things clear
I have several questions to catch up and I did't find answers in the document. I would appreciate that anyone could give some feedbacks cause the issues really important for flink production environments. 1. Will the commit topology way block stream processing in cases that maintenance tasks especially compaction run too slowly or resources(subtask thread count) are not sufficient? This is quite a flink issue and I wonder this could be fixed in commit topology. I think maybe sync and buffer events may somehow resolve this issue, which I think could be seriously discussed for production use cases. 2. How to deal with CURD operations while flink stream processing works. For example, I could run a spark 'UPDATE' SQL while flink ingestion job running on the same table. Will the maintenance tasks include this CURD optimization? or just process events from upstream operators? From my view, a single monitor job seems available for all cases and commit topology way assumes only this flink job would write data on the table. 3. In our secenarios, we found that equality-delete files would heavily impact read performance while the impact of positional-delete files (small enougth) could be negligible. Shall we make some designs towards this phenomenon? I am new to this thread and our team had some experiance for flink on iceberg improvements. It would be my honor if I can join this thread and make my contributions. Is there a single flink dev mail list for Iceberg? or all in dev mails. How and when to set a discussion? Péter Váry <peter.vary.apa...@gmail.com> 于2024年4月22日周一 18:22写道: > Hi Team, > > The discussion continued on the previous thread by Gen Luo and Zhu Zhu. > See: https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl. > Adding them to this thread too, so we can continue the discussion in one > place. Based on their thoughts there, the current suggestion is: > > Q: Do you have concerns with the Maintenance Tasks execution > infrastructure? > A: > Topology: > > - We should prioritize the Monitor based solution as it is more > general, and better at separating resource usage from the main writer jobs > - PostCommitTopology based integration has its own uses, but might > have a lower priority > > Checkpointing: > > - We should consider AsyncIO as much as possible to prevent blocking > of checkpoint creation > - Because of the current limitations of AsyncIO, unaligned checkpoints > could be useful when the task size grows > > > Q: Do you have use-cases for other Maintenance Task(s) than mentioned in > the doc? > A: Data file rewrite (Global/Incremental), Expire snapshots, Manifest > files rewrite, Delete orphan files - no new tasks come up > > Q: Do you think more scheduling information is needed than mentioned in > the doc? > A: Commit number, New data file number, New data file size, New delete > file number, Elapsed time since the last run - no new scheduling > information come up > > Q: How to prevent concurrent Maintenance Tasks? > A: Discussed different possible solutions, but the best bet is still the > tag based locking > > I have yet to hear back from most of you on Flink table maintenance. If > you have time, please share your thoughts. > If there are no more concerns, I would love to close this part of the > design and move forward with the implementation. > > Thanks, > Peter > > > > Brian Olsen <bitsondata...@gmail.com> ezt írta (időpont: 2024. ápr. 12., > P, 7:58): > >> Hey everyone, >> >> Following up from this meeting, we decided due to the distributed nature >> of everyone involved on Flink, it would be best to achieve consensus on >> some critical points that Péter outlined for us. >> >> >> - Do you have concerns with the Maintenance Tasks execution >> infrastructure? The current plan is: >> >> >> - Streaming tasks to continuously execute the Maintenance Tasks >> - SinkV2 PostCommitTopology or Monitor service to collect the changes >> - Scheduler to decide which Maintenance Task(s) to run >> - Serialized Maintenance Task execution >> >> >> - Do you have use-cases for other Maintenance Task(s) than mentioned >> in the doc? Do you have different priorities? >> >> >> - Data file rewrite >> >> >> - Global for rewriting existing and new files - equality/positional >> delete removal, repartitioning etc >> - Incremental for rewriting only new files - no metadata read needed >> >> >> - Expire snapshots >> - Manifest files rewrite >> - Delete orphan files >> - Positional delete files - probably not needed for Flink >> >> >> - Do you think more scheduling information is need than mentioned in >> the doc? >> >> >> - Commit number >> - New data file number >> - New data file size >> - New delete file number >> - Elapsed time since the last run >> >> >> - How to prevent concurrent Maintenance Tasks? >> >> >> - Just run, and retry - will clutter the logs with failures, and >> waste resources >> - External locking >> - Using Iceberg table tags to prevent concurrent runs - mixing user >> data with technical data >> - We need a better solution [image: :smile:] >> >> >> >> I will be following up with these members if I don't see them reply in >> the following week. If you have no comment and you are on this list, please >> reply 'looks good' to acknowledge you have seen it. Anyone is free to chime >> in, we just wanted a list of required eyes before assuming consensus. >> >> Péter >> Ryan Blue >> Daniel C. Weeks >> Russell Spitzer >> Steven Wu >> Bryan Keller >> Manu Zhang >> Ajantha Bhat >> Yanghao Lin >> Thanks everyone!! >> >> On Thu, Apr 11, 2024 at 7:13 AM wenjin <wenjin...@gmail.com> wrote: >> >>> Hi Peter, >>> >>> I am interested in your proposal and have clarified some confusions with >>> your help in Flink community, thanks for your answers. >>> >>> I participated in yesterday’s discussion, but since I am not a native >>> English speaker, I am concerned that I may have missed some details >>> regarding the follow actions about this proposal. So, if possible, please >>> involve me when there are further discuss or updates. >>> >>> Thanks, >>> wenjin >>> >>> On 2024/04/09 04:48:48 Péter Váry wrote: >>> > Forwarding the invite for the discussion we plan to do with the Iceberg >>> > folks, as some of you might be interested in this. >>> > >>> > ---------- Forwarded message --------- >>> > From: Brian Olsen <bi...@gmail.com> >>> > Date: Mon, Apr 8, 2024, 18:29 >>> > Subject: Re: Flink table maintenance >>> > To: <de...@iceberg.apache.org> >>> > >>> > >>> > Hey Iceberg nation, >>> > >>> > I would like to share about the meeting this Wednesday to further >>> discuss >>> > details of Péter's proposal on Flink Maintenance Tasks. >>> > Calendar Link: https://calendar.app.google/83HGYWXoQJ8zXuVCA >>> > >>> > List discussion: >>> > https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl >>> > < >>> https://www.google.com/url?q=https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl&sa=D&source=calendar&usd=2&usg=AOvVaw2-aePIRr6APFVHpRDipMgX >>> > >>> > >>> > Design Doc: Flink table maintenance >>> > < >>> https://www.google.com/url?q=https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp%3Dsharing&sa=D&source=calendar&usd=2&usg=AOvVaw1oLYQP76-G1ZEOW5pTxV1M >>> > >>> > >>> > >>> > >>> > On Mon, Apr 1, 2024 at 8:52 PM Manu Zhang <ow...@gmail.com> wrote: >>> > >>> > > Hi Peter, >>> > > >>> > > Are you proposing to create a user facing locking feature in >>> Iceberg, or >>> > >> just something something for internal use? >>> > >> >>> > > >>> > > Since it's a general issue, I'm proposing to create a general user >>> > > interface first, while the implementation can be left to users. For >>> > > example, we use Airflow to schedule maintenance jobs and we can check >>> > > in-progress jobs with the Airflow API. Hive metastore lock might be >>> another >>> > > option we can implement for users. >>> > > >>> > > Thanks, >>> > > Manu >>> > > >>> > > On Tue, Apr 2, 2024 at 5:26 AM Péter Váry <pe...@gmail.com> >>> > > wrote: >>> > > >>> > >> Hi Ajantha, >>> > >> >>> > >> I thought about enabling post commit topology based compaction for >>> sinks >>> > >> using options, like we use for the parametrization of streaming >>> reads [1]. >>> > >> I think it will be hard to do it in a user friendly way - because >>> of the >>> > >> high number of parameters -, but I think it is a possible solution >>> with >>> > >> sensible defaults. >>> > >> >>> > >> There is a batch-like solution for data file compaction already >>> available >>> > >> [2], but I do not see how we could extend Flink SQL to be able to >>> call it. >>> > >> >>> > >> Writing to a branch using Flink SQL should be another thread, but >>> by my >>> > >> first guess, it shouldn't be hard to implement using options, like: >>> > >> /*+ OPTIONS('branch'='b1') */ >>> > >> Since writing to branch i already working through the Java API [3]. >>> > >> >>> > >> Thanks, Peter >>> > >> >>> > >> 1 - >>> > >> >>> https://iceberg.apache.org/docs/latest/flink-queries/#flink-streaming-read >>> > >> 2 - >>> > >> >>> https://github.com/apache/iceberg/blob/820fc3ceda386149f42db8b54e6db9171d1a3a6d/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java >>> > >> 3 - >>> > >> https://iceberg.apache.org/docs/latest/flink-writes/#branch-writes >>> > >> >>> > >> On Mon, Apr 1, 2024, 16:30 Ajantha Bhat <aj...@gmail.com> wrote: >>> > >> >>> > >>> Thanks for the proposal Peter. >>> > >>> >>> > >>> I just wanted to know do we have any plans for supporting SQL >>> syntax for >>> > >>> table maintenance (like CALL procedure) for pure Flink SQL users? >>> > >>> I didn't see any custom SQL parser plugin support in Flink. I also >>> saw >>> > >>> that Branch write doesn't have SQL support (only Branch reads use >>> Option), >>> > >>> So I am not sure about the roadmap of Iceberg SQL support in Flink. >>> > >>> Was there any discussion before? >>> > >>> >>> > >>> - Ajantha >>> > >>> >>> > >>> On Mon, Apr 1, 2024 at 7:51 PM Péter Váry <pe...@gmail.com> >>> > >>> wrote: >>> > >>> >>> > >>>> Hi Manu, >>> > >>>> >>> > >>>> Just to clarify: >>> > >>>> - Are you proposing to create a user facing locking feature in >>> Iceberg, >>> > >>>> or just something something for internal use? >>> > >>>> >>> > >>>> I think we shouldn't add locking to Iceberg's user facing scope >>> in this >>> > >>>> stage. A fully featured locking system has many more features >>> that we need >>> > >>>> (priorities, fairness, timeouts etc). I could be tempted when we >>> are >>> > >>>> talking about the REST catalog, but I think that should be >>> further down the >>> > >>>> road, if ever... >>> > >>>> >>> > >>>> About using the tags: >>> > >>>> - I whole-heartedly agree that using tags is not intuitive, and I >>> see >>> > >>>> your points in most of your arguments. OTOH, introducing new >>> requirement >>> > >>>> (locking mechanism) seems like a wrong direction to me. >>> > >>>> - We already defined a requirement (atomic changes on the table) >>> for >>> > >>>> the Catalog implementations which could be used to archive our >>> goal here. >>> > >>>> - We also already store technical data in snapshot properties in >>> Flink >>> > >>>> jobs (JobId, OperatorId, CheckpointId). Maybe technical tags/table >>> > >>>> properties is not a big stretch. >>> > >>>> >>> > >>>> Or we can look at these tags or metadata as 'technical data' >>> which is >>> > >>>> internal to Iceberg, and shouldn't expressed on the external API. >>> My >>> > >>>> concern is: >>> > >>>> - Would it be used often enough to worth the additional >>> complexity? >>> > >>>> >>> > >>>> Knowing that Spark compaction is struggling with the same issue >>> is a >>> > >>>> good indicator, but probably we would need more use cases for >>> introducing a >>> > >>>> new feature with this complexity, or simpler solution. >>> > >>>> >>> > >>>> Thanks, Peter >>> > >>>> >>> > >>>> >>> > >>>> On Mon, Apr 1, 2024, 10:18 Manu Zhang <ow...@gmail.com> wrote: >>> > >>>> >>> > >>>>> What would the community think of exploiting tags for preventing >>> > >>>>>> concurrent maintenance loop executions. >>> > >>>>> >>> > >>>>> >>> > >>>>> This issue is not specific to Flink maintenance jobs. We have a >>> > >>>>> service scheduling Spark maintenance jobs by watching table >>> commits. When >>> > >>>>> we don't check in-progress maintenance jobs for the same table, >>> multiple >>> > >>>>> jobs will run concurrently and have conflicts. >>> > >>>>> >>> > >>>>> Basically, I think we need a lock mechanism like the metastore >>> lock >>> > >>>>> < >>> https://iceberg.apache.org/docs/nightly/configuration/#hadoop-configuration >>> > >>> > >>>>> if we want to handle it for users. However, using TAG doesn't >>> look >>> > >>>>> intuitive to me. We are also mixing user data with system >>> metadata. >>> > >>>>> Maybe we can define some general interfaces and leave the >>> > >>>>> implementation to users in the first version. >>> > >>>>> >>> > >>>>> Regards, >>> > >>>>> Manu >>> > >>>>> >>> > >>>>> >>> > >>>>> >>> > >>>>> On Fri, Mar 29, 2024 at 1:59 PM Péter Váry < >>> > >>>>> peter.vary.apa...@gmail.com> wrote: >>> > >>>>> >>> > >>>>>> What would the community think of exploiting tags for preventing >>> > >>>>>> concurrent maintenance loop executions. >>> > >>>>>> >>> > >>>>>> The issue: >>> > >>>>>> Some maintenance tasks couldn't run parallel, like >>> DeleteOrphanFiles >>> > >>>>>> vs. ExpireSnapshots, or RewriteDataFiles vs. >>> RewriteManifestFiles. We make >>> > >>>>>> sure, not to run tasks started by a single trigger concurrently >>> by >>> > >>>>>> serializing them, but there are no loops in Flink, so we can't >>> synchronize >>> > >>>>>> tasks started by the next trigger. >>> > >>>>>> >>> > >>>>>> In the document, I describe that we need to rely on the user to >>> > >>>>>> ensure that the rate limit is high enough to prevent concurrent >>> triggers. >>> > >>>>>> >>> > >>>>>> Proposal: >>> > >>>>>> When firing a trigger, RateLimiter could check and create an >>> Iceberg >>> > >>>>>> table tag [1] for the current table snapshot, with the name: >>> > >>>>>> '__flink_maitenance'. When the execution finishes we remove >>> this tag. The >>> > >>>>>> RateLimiter keep accumulating changes, and doesn't fire new >>> triggers until >>> > >>>>>> it finds this tag on the table. >>> > >>>>>> The solution relies on Flink 'forceNonParallel' to prevent >>> concurrent >>> > >>>>>> execution of placing the tag, and on Iceberg to store it. >>> > >>>>>> >>> > >>>>>> This not uses the tags as intended, but seems like a better >>> solution >>> > >>>>>> than adding/removing table properties which would clutter the >>> table history >>> > >>>>>> with technical data. >>> > >>>>>> >>> > >>>>>> Your thoughts? Any other suggestions/solutions would be welcome. >>> > >>>>>> >>> > >>>>>> Thanks, >>> > >>>>>> Peter >>> > >>>>>> >>> > >>>>>> [1] >>> > >>>>>> >>> https://iceberg.apache.org/docs/latest/java-api-quickstart/#branching-and-tagging >>> > >>>>>> >>> > >>>>>> On Thu, Mar 28, 2024, 14:44 Péter Váry <pe...@gmail.com> >>> > >>>>>> wrote: >>> > >>>>>> >>> > >>>>>>> Hi Team, >>> > >>>>>>> >>> > >>>>>>> As discussed on yesterday's community sync, I am working on >>> adding a >>> > >>>>>>> possibility to the Flink Iceberg connector to run maintenance >>> tasks on the >>> > >>>>>>> Iceberg tables. This will fix the small files issues and in >>> the long run >>> > >>>>>>> help compacting the high number of positional and equality >>> deletes created >>> > >>>>>>> by Flink tasks writing CDC data to Iceberg tables without the >>> need of Spark >>> > >>>>>>> in the infrastructure. >>> > >>>>>>> >>> > >>>>>>> I did some planning, prototyping and currently trying out the >>> > >>>>>>> solution on a larger scale. >>> > >>>>>>> >>> > >>>>>>> I put together a document how my current solution looks like: >>> > >>>>>>> >>> > >>>>>>> >>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp=sharing >>> > >>>>>>> >>> > >>>>>>> I would love to hear your thoughts and feedback on this to >>> find a >>> > >>>>>>> good final solution. >>> > >>>>>>> >>> > >>>>>>> Thanks, >>> > >>>>>>> Peter >>> > >>>>>>> >>> > >>>>>> >>> > >> >>