Thanks Zhu and Gen for your reply! *Unaligned checkpoints* The maintenance tasks are big and complex. There are many parts which can be parallelized to multiple tasks, so I think we should implement them with chains of Flink Operators to exploit the advantages of Flink. A single maintenance task could take a serious amount of time, even with the parallelization, and most probably will cause backpressure somewhere in the flow. This backpressure will cause delays in checpointing, which we can fight one-by-one with buffer settings/AsyncIO, or use unaligned checkpoints, which is designed for exactly these cases.
*PostCommit or Separate job* I agree with both of you, that in most cases separating out the maintenance pipelines from the actual jobs is the way to go. However there are 2 cases which are highlighted by the Iceberg community which will not be available in this case: - Flink job doing a quick small file reduction after collecting several commits - Single, contain all Flink jobs for sinking data to an Iceberg table Sooner or later we need to move to the new Flink SinkV2 API anyways. If we plan the architecture correctly, it is not too much work to add the maintenance tasks to the end of the sink (if the user decides that this serves their case the best) *OperatorCoordinator* Both of you mention OperatorCoordinator.Context#getCoordinatorStore() which might serve our needs. I checked it again, and it is marked as Internal by the Flink community. We are not supposed to use it outside of Flink, as it could be broken any time in the future. Do you know about plans to make this a Public interface? *AsyncIO* Even with unaligned checkpoint there are issues with this. See: https://issues.apache.org/jira/browse/FLINK-34704 So I do not think AsyncIO could help with checkpoints until this issue is fixed. After this we can use it to execute long tasks outside of the main thread, and maybe eventually relax the unaligned checkpoint requirement. (AsyncIO operators are unaligned checkpoints behind the scenes, as the input and output records are serialized in the state) *CALL statements* You mentioned that by using CALL statements, we do not reserve the resources when we not use them. I might miss something, but I think that the CALL procedure will run in the same application where it was called, and we need to provide enough resources to this application, so we are able to finish the maintenance tasks. When there are no maintenance tasks, then this application doesn't do anything (essentially idling the resources). I don't see yet, how this is different than sceduling the tasks by a specific operator. What do I miss? Thanks, Peter Gen Luo <luogen...@gmail.com> ezt írta (időpont: 2024. ápr. 18., Cs, 14:32): > Hi, > > Thanks for the reply, Peter. > And thanks Zhu for joining the discussion. > > > In the current design, using unaligned checkpoints, and splitting heavy > tasks to smaller ones could avoid maintenance tasks blocking the > checkpointing. > > How do you suggest executing asynchronous tasks > > As far as I know, the unaligned checkpoint is not widely used (or maybe in > my sight). And in some cases, it may slow down the checkpointing and > restarting since more data (of the original job) is snapshotted. So the > solution may not be able to cover all scenarios. So yes, I would > suggest using AsyncIO, or something like it, i.e. we can implement our own > async operator if we need to coordinate the async tasks. We can also give > the operator a separate ssg, and this should be possible too if we use the > AsyncIO. > > > CALL statements are one-time calls, while the goal here is to > continuously schedule the tasks. > > Yes it is. I meant to suggest that we can start up a separate job in > application mode, which can monitor the table and submit CALL statements > continuously. This is proposed because both using the post commit topology > and using a separate maintenance job would occupy the resources forever, > while in this way the maintenance job is started only when necessary. But > if the maintenance tasks are triggered frequently, holding the resources > would not be an issue any longer. > > > Are the OperatorCoordinators for different Operators able to communicate > to each other? > > Is it possible for the Update Operator to communicate with the Planner > Operator through the OperatorCoordinator? > > OperatorCoordinator.Context#getCoordinatorStore() can do this. You may put > messages or even a reference to a coordinator for other coordinators to > get. > > > On Thu, Apr 18, 2024 at 3:08 AM Péter Váry <peter.vary.apa...@gmail.com> > wrote: > >> Hi Gen, >> >> Thanks for your interest and thoughts! >> See my answers and questions below: >> >> Gen Luo <luogen...@gmail.com> ezt írta (időpont: 2024. ápr. 17., Sze, >> 20:32): >> >>> Hi, >>> >>> Sorry for joining the discussion so late. I'm from the flink community >>> and did some work about the SinkV2. I'd like to share some thoughts from >>> the view of flink. While I'm quite new to Iceberg, please feel free to >>> correct me if I'm making any mistakes. >>> >>> 1. checkpointing >>> I think the maintenance tasks are better not to disturb the original >>> job. So it's better not to block the checkpointing, or the checkpoints may >>> timeout and fail when a heavy task is running, which may cause the job >>> failing. To achieve this, the task executing operators need to split the >>> heavy task into small ones, or execute the tasks asynchronously in an >>> executor pool instead of the main task thread and store the task requests >>> in the snapshots. In this way the snapshotting can be done in a guaranteed >>> short time. While if the tasks are done with complex topologies, it may be >>> impossible to do this. If so, I think it's better to start a separate job >>> instead of using the post commit topology. >>> >> >> In the current design, using unaligned checkpoints, and splitting heavy >> tasks to smaller ones could avoid maintenance tasks blocking the >> checkpointing. Also the design makes it possible to define different slot >> sharing groups ( >> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/finegrained_resource/) >> which makes sure Feature support the main job threads are not blocked by >> the maintenance tasks. >> >> That said, you are right that in some situations it is better to start >> the job in a separate thread. >> >> How do you suggest executing asynchronous tasks, and how could these >> asynchronous tasks communicate with the main thread? Are you suggesting to >> use the Flink AsyncIO ( >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/). >> While it runs in a separate thread, it still uses the same resources >> (nodes) as the default jobs. >> >> 2. resources >>> Occupying lots of resources when tasks are idle may be an issue, but to >>> execute the tasks periodically in the job, it seems that there's no perfect >>> solution. All we can do is to reduce the resource requirements. >>> >>> On the other hand, Flink recently supported the CALL statement in 1.18. >>> I have to say I'm not really familiar with it, but it seems to be a >>> solution to execute the tasks for one time. Maybe we can create a >>> monitoring job (or in the main function), which can trigger tasks by >>> executing CALL statements. In this way resource wasting can be avoided, but >>> this is quite a different direction indeed. >>> >> >> CALL statements are one-time calls, while the goal here is to >> continuously schedule the tasks. >> Happily the plan includes separate flows for the specific tasks, so if we >> decide that we want to schedule the tasks by hand, then it is possible to >> start them from the CALL statement implementations. >> >> >>> Besides, I remember that at the meeting, it's proposed to adjust the >>> resource for maintenance tasks by auto scaling. I'm afraid it's not a good >>> idea to rely on it. As far as I know, rescaling it triggers needs to >>> restart the job, which may not be acceptable by most of the users. >>> >> >> Also in the meeting I tried to explain that it should only work when the >> "in-place" rescaling would work, so we are absolutely aligned here. >> >> >>> 3. concurrency >>> If the table is only operated in one Flink job, we can use >>> OperatorCoordinator to coordinate the tasks. OperatorCoordinator may also >>> contact each other to ensure different kinds of tasks are executed in >>> expected ways. >>> >> >> Are the OperatorCoordinators for different Operators able to communicate >> to each other? >> In the planned RewriteDataFiles flow (data file compaction), we have 3 >> chained operators: >> 1. Planner - to plan the groups we want to compact >> 2. Executor - to read the small files, and create a single big file >> 3. Updater - to collect the new files and update the Iceberg table >> >> Is it possible for the Update Operator to communicate with the Planner >> Operator through the OperatorCoordinator? >> >> >>> While I suppose it's necessary to have a lock mechanism if we want to >>> coordinate the operation from different engines. >>> >>> (Am I replying to the correct mail?) >>> >> >> Yes :) >> >>> >>> Thanks, >>> Gen >>> >>> On Tue, Apr 9, 2024 at 12:29 AM Brian Olsen <bitsondata...@gmail.com> >>> wrote: >>> >>>> Hey Iceberg nation, >>>> >>>> I would like to share about the meeting this Wednesday to further >>>> discuss details of Péter's proposal on Flink Maintenance Tasks. >>>> Calendar Link: https://calendar.app.google/83HGYWXoQJ8zXuVCA >>>> >>>> List discussion: >>>> https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl >>>> <https://www.google.com/url?q=https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl&sa=D&source=calendar&usd=2&usg=AOvVaw2-aePIRr6APFVHpRDipMgX> >>>> >>>> Design Doc: Flink table maintenance >>>> <https://www.google.com/url?q=https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp%3Dsharing&sa=D&source=calendar&usd=2&usg=AOvVaw1oLYQP76-G1ZEOW5pTxV1M> >>>> >>>> >>>> >>>> On Mon, Apr 1, 2024 at 8:52 PM Manu Zhang <owenzhang1...@gmail.com> >>>> wrote: >>>> >>>>> Hi Peter, >>>>> >>>>> Are you proposing to create a user facing locking feature in Iceberg, >>>>>> or just something something for internal use? >>>>>> >>>>> >>>>> Since it's a general issue, I'm proposing to create a general user >>>>> interface first, while the implementation can be left to users. For >>>>> example, we use Airflow to schedule maintenance jobs and we can check >>>>> in-progress jobs with the Airflow API. Hive metastore lock might be >>>>> another >>>>> option we can implement for users. >>>>> >>>>> Thanks, >>>>> Manu >>>>> >>>>> On Tue, Apr 2, 2024 at 5:26 AM Péter Váry <peter.vary.apa...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Ajantha, >>>>>> >>>>>> I thought about enabling post commit topology based compaction for >>>>>> sinks using options, like we use for the parametrization of streaming >>>>>> reads >>>>>> [1]. I think it will be hard to do it in a user friendly way - because of >>>>>> the high number of parameters -, but I think it is a possible solution >>>>>> with >>>>>> sensible defaults. >>>>>> >>>>>> There is a batch-like solution for data file compaction already >>>>>> available [2], but I do not see how we could extend Flink SQL to be able >>>>>> to >>>>>> call it. >>>>>> >>>>>> Writing to a branch using Flink SQL should be another thread, but by >>>>>> my first guess, it shouldn't be hard to implement using options, like: >>>>>> /*+ OPTIONS('branch'='b1') */ >>>>>> Since writing to branch i already working through the Java API [3]. >>>>>> >>>>>> Thanks, Peter >>>>>> >>>>>> 1 - >>>>>> https://iceberg.apache.org/docs/latest/flink-queries/#flink-streaming-read >>>>>> 2 - >>>>>> https://github.com/apache/iceberg/blob/820fc3ceda386149f42db8b54e6db9171d1a3a6d/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java >>>>>> 3 - >>>>>> https://iceberg.apache.org/docs/latest/flink-writes/#branch-writes >>>>>> >>>>>> On Mon, Apr 1, 2024, 16:30 Ajantha Bhat <ajanthab...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Thanks for the proposal Peter. >>>>>>> >>>>>>> I just wanted to know do we have any plans for supporting SQL syntax >>>>>>> for table maintenance (like CALL procedure) for pure Flink SQL users? >>>>>>> I didn't see any custom SQL parser plugin support in Flink. I also >>>>>>> saw that Branch write doesn't have SQL support (only Branch reads use >>>>>>> Option), >>>>>>> So I am not sure about the roadmap of Iceberg SQL support in Flink. >>>>>>> Was there any discussion before? >>>>>>> >>>>>>> - Ajantha >>>>>>> >>>>>>> On Mon, Apr 1, 2024 at 7:51 PM Péter Váry < >>>>>>> peter.vary.apa...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Manu, >>>>>>>> >>>>>>>> Just to clarify: >>>>>>>> - Are you proposing to create a user facing locking feature in >>>>>>>> Iceberg, or just something something for internal use? >>>>>>>> >>>>>>>> I think we shouldn't add locking to Iceberg's user facing scope in >>>>>>>> this stage. A fully featured locking system has many more features >>>>>>>> that we >>>>>>>> need (priorities, fairness, timeouts etc). I could be tempted when we >>>>>>>> are >>>>>>>> talking about the REST catalog, but I think that should be further >>>>>>>> down the >>>>>>>> road, if ever... >>>>>>>> >>>>>>>> About using the tags: >>>>>>>> - I whole-heartedly agree that using tags is not intuitive, and I >>>>>>>> see your points in most of your arguments. OTOH, introducing new >>>>>>>> requirement (locking mechanism) seems like a wrong direction to me. >>>>>>>> - We already defined a requirement (atomic changes on the table) >>>>>>>> for the Catalog implementations which could be used to archive our goal >>>>>>>> here. >>>>>>>> - We also already store technical data in snapshot properties in >>>>>>>> Flink jobs (JobId, OperatorId, CheckpointId). Maybe technical >>>>>>>> tags/table >>>>>>>> properties is not a big stretch. >>>>>>>> >>>>>>>> Or we can look at these tags or metadata as 'technical data' which >>>>>>>> is internal to Iceberg, and shouldn't expressed on the external API. My >>>>>>>> concern is: >>>>>>>> - Would it be used often enough to worth the additional complexity? >>>>>>>> >>>>>>>> Knowing that Spark compaction is struggling with the same issue is >>>>>>>> a good indicator, but probably we would need more use cases for >>>>>>>> introducing >>>>>>>> a new feature with this complexity, or simpler solution. >>>>>>>> >>>>>>>> Thanks, Peter >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Apr 1, 2024, 10:18 Manu Zhang <owenzhang1...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> What would the community think of exploiting tags for preventing >>>>>>>>>> concurrent maintenance loop executions. >>>>>>>>> >>>>>>>>> >>>>>>>>> This issue is not specific to Flink maintenance jobs. We have a >>>>>>>>> service scheduling Spark maintenance jobs by watching table commits. >>>>>>>>> When >>>>>>>>> we don't check in-progress maintenance jobs for the same table, >>>>>>>>> multiple >>>>>>>>> jobs will run concurrently and have conflicts. >>>>>>>>> >>>>>>>>> Basically, I think we need a lock mechanism like the metastore >>>>>>>>> lock >>>>>>>>> <https://iceberg.apache.org/docs/nightly/configuration/#hadoop-configuration> >>>>>>>>> if we want to handle it for users. However, using TAG doesn't look >>>>>>>>> intuitive to me. We are also mixing user data with system metadata. >>>>>>>>> Maybe we can define some general interfaces and leave the >>>>>>>>> implementation to users in the first version. >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Manu >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Mar 29, 2024 at 1:59 PM Péter Váry < >>>>>>>>> peter.vary.apa...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> What would the community think of exploiting tags for preventing >>>>>>>>>> concurrent maintenance loop executions. >>>>>>>>>> >>>>>>>>>> The issue: >>>>>>>>>> Some maintenance tasks couldn't run parallel, like >>>>>>>>>> DeleteOrphanFiles vs. ExpireSnapshots, or RewriteDataFiles vs. >>>>>>>>>> RewriteManifestFiles. We make sure, not to run tasks started by a >>>>>>>>>> single >>>>>>>>>> trigger concurrently by serializing them, but there are no loops in >>>>>>>>>> Flink, >>>>>>>>>> so we can't synchronize tasks started by the next trigger. >>>>>>>>>> >>>>>>>>>> In the document, I describe that we need to rely on the user to >>>>>>>>>> ensure that the rate limit is high enough to prevent concurrent >>>>>>>>>> triggers. >>>>>>>>>> >>>>>>>>>> Proposal: >>>>>>>>>> When firing a trigger, RateLimiter could check and create an >>>>>>>>>> Iceberg table tag [1] for the current table snapshot, with the name: >>>>>>>>>> '__flink_maitenance'. When the execution finishes we remove this >>>>>>>>>> tag. The >>>>>>>>>> RateLimiter keep accumulating changes, and doesn't fire new triggers >>>>>>>>>> until >>>>>>>>>> it finds this tag on the table. >>>>>>>>>> The solution relies on Flink 'forceNonParallel' to prevent >>>>>>>>>> concurrent execution of placing the tag, and on Iceberg to store it. >>>>>>>>>> >>>>>>>>>> This not uses the tags as intended, but seems like a better >>>>>>>>>> solution than adding/removing table properties which would clutter >>>>>>>>>> the >>>>>>>>>> table history with technical data. >>>>>>>>>> >>>>>>>>>> Your thoughts? Any other suggestions/solutions would be welcome. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Peter >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> https://iceberg.apache.org/docs/latest/java-api-quickstart/#branching-and-tagging >>>>>>>>>> >>>>>>>>>> On Thu, Mar 28, 2024, 14:44 Péter Váry < >>>>>>>>>> peter.vary.apa...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Team, >>>>>>>>>>> >>>>>>>>>>> As discussed on yesterday's community sync, I am working on >>>>>>>>>>> adding a possibility to the Flink Iceberg connector to run >>>>>>>>>>> maintenance >>>>>>>>>>> tasks on the Iceberg tables. This will fix the small files issues >>>>>>>>>>> and in >>>>>>>>>>> the long run help compacting the high number of positional and >>>>>>>>>>> equality >>>>>>>>>>> deletes created by Flink tasks writing CDC data to Iceberg tables >>>>>>>>>>> without >>>>>>>>>>> the need of Spark in the infrastructure. >>>>>>>>>>> >>>>>>>>>>> I did some planning, prototyping and currently trying out the >>>>>>>>>>> solution on a larger scale. >>>>>>>>>>> >>>>>>>>>>> I put together a document how my current solution looks like: >>>>>>>>>>> >>>>>>>>>>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp=sharing >>>>>>>>>>> >>>>>>>>>>> I would love to hear your thoughts and feedback on this to find >>>>>>>>>>> a good final solution. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Peter >>>>>>>>>>> >>>>>>>>>>