Hi Gen, Thanks for your interest and thoughts! See my answers and questions below:
Gen Luo <luogen...@gmail.com> ezt írta (időpont: 2024. ápr. 17., Sze, 20:32): > Hi, > > Sorry for joining the discussion so late. I'm from the flink community and > did some work about the SinkV2. I'd like to share some thoughts from the > view of flink. While I'm quite new to Iceberg, please feel free to correct > me if I'm making any mistakes. > > 1. checkpointing > I think the maintenance tasks are better not to disturb the original job. > So it's better not to block the checkpointing, or the checkpoints may > timeout and fail when a heavy task is running, which may cause the job > failing. To achieve this, the task executing operators need to split the > heavy task into small ones, or execute the tasks asynchronously in an > executor pool instead of the main task thread and store the task requests > in the snapshots. In this way the snapshotting can be done in a guaranteed > short time. While if the tasks are done with complex topologies, it may be > impossible to do this. If so, I think it's better to start a separate job > instead of using the post commit topology. > In the current design, using unaligned checkpoints, and splitting heavy tasks to smaller ones could avoid maintenance tasks blocking the checkpointing. Also the design makes it possible to define different slot sharing groups ( https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/finegrained_resource/) which makes sure that the main job threads are not blocked by the maintenance tasks. That said, you are right that in some situations it is better to start the job in a separate thread. How do you suggest executing asynchronous tasks, and how could these asynchronous tasks communicate with the main thread? Are you suggesting to use the Flink AsyncIO ( https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/). While it runs in a separate thread, it still uses the same resources (nodes) as the default jobs. 2. resources > Occupying lots of resources when tasks are idle may be an issue, but to > execute the tasks periodically in the job, it seems that there's no perfect > solution. All we can do is to reduce the resource requirements. > > On the other hand, Flink recently supported the CALL statement in 1.18. I > have to say I'm not really familiar with it, but it seems to be a solution > to execute the tasks for one time. Maybe we can create a monitoring job (or > in the main function), which can trigger tasks by executing CALL > statements. In this way resource wasting can be avoided, but this is quite > a different direction indeed. > CALL statements are one-time calls, while the goal here is to continuously schedule the tasks. Happily the plan includes separate flows for the specific tasks, so if we decide that we want to schedule the tasks by hand, then it is possible to start them from the CALL statement implementations. > Besides, I remember that at the meeting, it's proposed to adjust the > resource for maintenance tasks by auto scaling. I'm afraid it's not a good > idea to rely on it. As far as I know, rescaling it triggers needs to > restart the job, which may not be acceptable by most of the users. > Also in the meeting I tried to explain that it should only work when the "in-place" rescaling would work, so we are absolutely aligned here. > 3. concurrency > If the table is only operated in one Flink job, we can use > OperatorCoordinator to coordinate the tasks. OperatorCoordinator may also > contact each other to ensure different kinds of tasks are executed in > expected ways. > Are the OperatorCoordinators for different Operators able to communicate to each other? In the planned RewriteDataFiles flow (data file compaction), we have 3 chained operators: 1. Planner - to plan the groups we want to compact 2. Executor - to read the small files, and create a single big file 3. Updater - to collect the new files and update the Iceberg table Is it possible for the Update Operator to communicate with the Planner Operator through the OperatorCoordinator? > While I suppose it's necessary to have a lock mechanism if we want to > coordinate the operation from different engines. > > (Am I replying to the correct mail?) > Yes :) > > Thanks, > Gen > > On Tue, Apr 9, 2024 at 12:29 AM Brian Olsen <bitsondata...@gmail.com> > wrote: > >> Hey Iceberg nation, >> >> I would like to share about the meeting this Wednesday to further >> discuss details of Péter's proposal on Flink Maintenance Tasks. >> Calendar Link: https://calendar.app.google/83HGYWXoQJ8zXuVCA >> >> List discussion: >> https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl >> <https://www.google.com/url?q=https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl&sa=D&source=calendar&usd=2&usg=AOvVaw2-aePIRr6APFVHpRDipMgX> >> >> Design Doc: Flink table maintenance >> <https://www.google.com/url?q=https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp%3Dsharing&sa=D&source=calendar&usd=2&usg=AOvVaw1oLYQP76-G1ZEOW5pTxV1M> >> >> >> >> On Mon, Apr 1, 2024 at 8:52 PM Manu Zhang <owenzhang1...@gmail.com> >> wrote: >> >>> Hi Peter, >>> >>> Are you proposing to create a user facing locking feature in Iceberg, or >>>> just something something for internal use? >>>> >>> >>> Since it's a general issue, I'm proposing to create a general user >>> interface first, while the implementation can be left to users. For >>> example, we use Airflow to schedule maintenance jobs and we can check >>> in-progress jobs with the Airflow API. Hive metastore lock might be another >>> option we can implement for users. >>> >>> Thanks, >>> Manu >>> >>> On Tue, Apr 2, 2024 at 5:26 AM Péter Váry <peter.vary.apa...@gmail.com> >>> wrote: >>> >>>> Hi Ajantha, >>>> >>>> I thought about enabling post commit topology based compaction for >>>> sinks using options, like we use for the parametrization of streaming reads >>>> [1]. I think it will be hard to do it in a user friendly way - because of >>>> the high number of parameters -, but I think it is a possible solution with >>>> sensible defaults. >>>> >>>> There is a batch-like solution for data file compaction already >>>> available [2], but I do not see how we could extend Flink SQL to be able to >>>> call it. >>>> >>>> Writing to a branch using Flink SQL should be another thread, but by my >>>> first guess, it shouldn't be hard to implement using options, like: >>>> /*+ OPTIONS('branch'='b1') */ >>>> Since writing to branch i already working through the Java API [3]. >>>> >>>> Thanks, Peter >>>> >>>> 1 - >>>> https://iceberg.apache.org/docs/latest/flink-queries/#flink-streaming-read >>>> 2 - >>>> https://github.com/apache/iceberg/blob/820fc3ceda386149f42db8b54e6db9171d1a3a6d/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java >>>> 3 - >>>> https://iceberg.apache.org/docs/latest/flink-writes/#branch-writes >>>> >>>> On Mon, Apr 1, 2024, 16:30 Ajantha Bhat <ajanthab...@gmail.com> wrote: >>>> >>>>> Thanks for the proposal Peter. >>>>> >>>>> I just wanted to know do we have any plans for supporting SQL syntax >>>>> for table maintenance (like CALL procedure) for pure Flink SQL users? >>>>> I didn't see any custom SQL parser plugin support in Flink. I also saw >>>>> that Branch write doesn't have SQL support (only Branch reads use Option), >>>>> So I am not sure about the roadmap of Iceberg SQL support in Flink. >>>>> Was there any discussion before? >>>>> >>>>> - Ajantha >>>>> >>>>> On Mon, Apr 1, 2024 at 7:51 PM Péter Váry <peter.vary.apa...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Manu, >>>>>> >>>>>> Just to clarify: >>>>>> - Are you proposing to create a user facing locking feature in >>>>>> Iceberg, or just something something for internal use? >>>>>> >>>>>> I think we shouldn't add locking to Iceberg's user facing scope in >>>>>> this stage. A fully featured locking system has many more features that >>>>>> we >>>>>> need (priorities, fairness, timeouts etc). I could be tempted when we are >>>>>> talking about the REST catalog, but I think that should be further down >>>>>> the >>>>>> road, if ever... >>>>>> >>>>>> About using the tags: >>>>>> - I whole-heartedly agree that using tags is not intuitive, and I see >>>>>> your points in most of your arguments. OTOH, introducing new requirement >>>>>> (locking mechanism) seems like a wrong direction to me. >>>>>> - We already defined a requirement (atomic changes on the table) for >>>>>> the Catalog implementations which could be used to archive our goal here. >>>>>> - We also already store technical data in snapshot properties in >>>>>> Flink jobs (JobId, OperatorId, CheckpointId). Maybe technical tags/table >>>>>> properties is not a big stretch. >>>>>> >>>>>> Or we can look at these tags or metadata as 'technical data' which is >>>>>> internal to Iceberg, and shouldn't expressed on the external API. My >>>>>> concern is: >>>>>> - Would it be used often enough to worth the additional complexity? >>>>>> >>>>>> Knowing that Spark compaction is struggling with the same issue is a >>>>>> good indicator, but probably we would need more use cases for >>>>>> introducing a >>>>>> new feature with this complexity, or simpler solution. >>>>>> >>>>>> Thanks, Peter >>>>>> >>>>>> >>>>>> On Mon, Apr 1, 2024, 10:18 Manu Zhang <owenzhang1...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> What would the community think of exploiting tags for preventing >>>>>>>> concurrent maintenance loop executions. >>>>>>> >>>>>>> >>>>>>> This issue is not specific to Flink maintenance jobs. We have a >>>>>>> service scheduling Spark maintenance jobs by watching table commits. >>>>>>> When >>>>>>> we don't check in-progress maintenance jobs for the same table, multiple >>>>>>> jobs will run concurrently and have conflicts. >>>>>>> >>>>>>> Basically, I think we need a lock mechanism like the metastore lock >>>>>>> <https://iceberg.apache.org/docs/nightly/configuration/#hadoop-configuration> >>>>>>> if we want to handle it for users. However, using TAG doesn't look >>>>>>> intuitive to me. We are also mixing user data with system metadata. >>>>>>> Maybe we can define some general interfaces and leave the >>>>>>> implementation to users in the first version. >>>>>>> >>>>>>> Regards, >>>>>>> Manu >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Mar 29, 2024 at 1:59 PM Péter Váry < >>>>>>> peter.vary.apa...@gmail.com> wrote: >>>>>>> >>>>>>>> What would the community think of exploiting tags for preventing >>>>>>>> concurrent maintenance loop executions. >>>>>>>> >>>>>>>> The issue: >>>>>>>> Some maintenance tasks couldn't run parallel, like >>>>>>>> DeleteOrphanFiles vs. ExpireSnapshots, or RewriteDataFiles vs. >>>>>>>> RewriteManifestFiles. We make sure, not to run tasks started by a >>>>>>>> single >>>>>>>> trigger concurrently by serializing them, but there are no loops in >>>>>>>> Flink, >>>>>>>> so we can't synchronize tasks started by the next trigger. >>>>>>>> >>>>>>>> In the document, I describe that we need to rely on the user to >>>>>>>> ensure that the rate limit is high enough to prevent concurrent >>>>>>>> triggers. >>>>>>>> >>>>>>>> Proposal: >>>>>>>> When firing a trigger, RateLimiter could check and create an >>>>>>>> Iceberg table tag [1] for the current table snapshot, with the name: >>>>>>>> '__flink_maitenance'. When the execution finishes we remove this tag. >>>>>>>> The >>>>>>>> RateLimiter keep accumulating changes, and doesn't fire new triggers >>>>>>>> until >>>>>>>> it finds this tag on the table. >>>>>>>> The solution relies on Flink 'forceNonParallel' to prevent >>>>>>>> concurrent execution of placing the tag, and on Iceberg to store it. >>>>>>>> >>>>>>>> This not uses the tags as intended, but seems like a better >>>>>>>> solution than adding/removing table properties which would clutter the >>>>>>>> table history with technical data. >>>>>>>> >>>>>>>> Your thoughts? Any other suggestions/solutions would be welcome. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Peter >>>>>>>> >>>>>>>> [1] >>>>>>>> https://iceberg.apache.org/docs/latest/java-api-quickstart/#branching-and-tagging >>>>>>>> >>>>>>>> On Thu, Mar 28, 2024, 14:44 Péter Váry <peter.vary.apa...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Team, >>>>>>>>> >>>>>>>>> As discussed on yesterday's community sync, I am working on adding >>>>>>>>> a possibility to the Flink Iceberg connector to run maintenance tasks >>>>>>>>> on >>>>>>>>> the Iceberg tables. This will fix the small files issues and in the >>>>>>>>> long >>>>>>>>> run help compacting the high number of positional and equality deletes >>>>>>>>> created by Flink tasks writing CDC data to Iceberg tables without the >>>>>>>>> need >>>>>>>>> of Spark in the infrastructure. >>>>>>>>> >>>>>>>>> I did some planning, prototyping and currently trying out the >>>>>>>>> solution on a larger scale. >>>>>>>>> >>>>>>>>> I put together a document how my current solution looks like: >>>>>>>>> >>>>>>>>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp=sharing >>>>>>>>> >>>>>>>>> I would love to hear your thoughts and feedback on this to find a >>>>>>>>> good final solution. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Peter >>>>>>>>> >>>>>>>>