Hi Ajantha, I thought about enabling post commit topology based compaction for sinks using options, like we use for the parametrization of streaming reads [1]. I think it will be hard to do it in a user friendly way - because of the high number of parameters -, but I think it is a possible solution with sensible defaults.
There is a batch-like solution for data file compaction already available [2], but I do not see how we could extend Flink SQL to be able to call it. Writing to a branch using Flink SQL should be another thread, but by my first guess, it shouldn't be hard to implement using options, like: /*+ OPTIONS('branch'='b1') */ Since writing to branch i already working through the Java API [3]. Thanks, Peter 1 - https://iceberg.apache.org/docs/latest/flink-queries/#flink-streaming-read 2 - https://github.com/apache/iceberg/blob/820fc3ceda386149f42db8b54e6db9171d1a3a6d/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java 3 - https://iceberg.apache.org/docs/latest/flink-writes/#branch-writes On Mon, Apr 1, 2024, 16:30 Ajantha Bhat <ajanthab...@gmail.com> wrote: > Thanks for the proposal Peter. > > I just wanted to know do we have any plans for supporting SQL syntax for > table maintenance (like CALL procedure) for pure Flink SQL users? > I didn't see any custom SQL parser plugin support in Flink. I also saw > that Branch write doesn't have SQL support (only Branch reads use Option), > So I am not sure about the roadmap of Iceberg SQL support in Flink. > Was there any discussion before? > > - Ajantha > > On Mon, Apr 1, 2024 at 7:51 PM Péter Váry <peter.vary.apa...@gmail.com> > wrote: > >> Hi Manu, >> >> Just to clarify: >> - Are you proposing to create a user facing locking feature in Iceberg, >> or just something something for internal use? >> >> I think we shouldn't add locking to Iceberg's user facing scope in this >> stage. A fully featured locking system has many more features that we need >> (priorities, fairness, timeouts etc). I could be tempted when we are >> talking about the REST catalog, but I think that should be further down the >> road, if ever... >> >> About using the tags: >> - I whole-heartedly agree that using tags is not intuitive, and I see >> your points in most of your arguments. OTOH, introducing new requirement >> (locking mechanism) seems like a wrong direction to me. >> - We already defined a requirement (atomic changes on the table) for the >> Catalog implementations which could be used to archive our goal here. >> - We also already store technical data in snapshot properties in Flink >> jobs (JobId, OperatorId, CheckpointId). Maybe technical tags/table >> properties is not a big stretch. >> >> Or we can look at these tags or metadata as 'technical data' which is >> internal to Iceberg, and shouldn't expressed on the external API. My >> concern is: >> - Would it be used often enough to worth the additional complexity? >> >> Knowing that Spark compaction is struggling with the same issue is a good >> indicator, but probably we would need more use cases for introducing a new >> feature with this complexity, or simpler solution. >> >> Thanks, Peter >> >> >> On Mon, Apr 1, 2024, 10:18 Manu Zhang <owenzhang1...@gmail.com> wrote: >> >>> What would the community think of exploiting tags for preventing >>>> concurrent maintenance loop executions. >>> >>> >>> This issue is not specific to Flink maintenance jobs. We have a service >>> scheduling Spark maintenance jobs by watching table commits. When we don't >>> check in-progress maintenance jobs for the same table, multiple jobs will >>> run concurrently and have conflicts. >>> >>> Basically, I think we need a lock mechanism like the metastore lock >>> <https://iceberg.apache.org/docs/nightly/configuration/#hadoop-configuration> >>> if we want to handle it for users. However, using TAG doesn't look >>> intuitive to me. We are also mixing user data with system metadata. >>> Maybe we can define some general interfaces and leave the implementation >>> to users in the first version. >>> >>> Regards, >>> Manu >>> >>> >>> >>> On Fri, Mar 29, 2024 at 1:59 PM Péter Váry <peter.vary.apa...@gmail.com> >>> wrote: >>> >>>> What would the community think of exploiting tags for preventing >>>> concurrent maintenance loop executions. >>>> >>>> The issue: >>>> Some maintenance tasks couldn't run parallel, like DeleteOrphanFiles >>>> vs. ExpireSnapshots, or RewriteDataFiles vs. RewriteManifestFiles. We make >>>> sure, not to run tasks started by a single trigger concurrently by >>>> serializing them, but there are no loops in Flink, so we can't synchronize >>>> tasks started by the next trigger. >>>> >>>> In the document, I describe that we need to rely on the user to ensure >>>> that the rate limit is high enough to prevent concurrent triggers. >>>> >>>> Proposal: >>>> When firing a trigger, RateLimiter could check and create an Iceberg >>>> table tag [1] for the current table snapshot, with the name: >>>> '__flink_maitenance'. When the execution finishes we remove this tag. The >>>> RateLimiter keep accumulating changes, and doesn't fire new triggers until >>>> it finds this tag on the table. >>>> The solution relies on Flink 'forceNonParallel' to prevent concurrent >>>> execution of placing the tag, and on Iceberg to store it. >>>> >>>> This not uses the tags as intended, but seems like a better solution >>>> than adding/removing table properties which would clutter the table history >>>> with technical data. >>>> >>>> Your thoughts? Any other suggestions/solutions would be welcome. >>>> >>>> Thanks, >>>> Peter >>>> >>>> [1] >>>> https://iceberg.apache.org/docs/latest/java-api-quickstart/#branching-and-tagging >>>> >>>> On Thu, Mar 28, 2024, 14:44 Péter Váry <peter.vary.apa...@gmail.com> >>>> wrote: >>>> >>>>> Hi Team, >>>>> >>>>> As discussed on yesterday's community sync, I am working on adding a >>>>> possibility to the Flink Iceberg connector to run maintenance tasks on the >>>>> Iceberg tables. This will fix the small files issues and in the long run >>>>> help compacting the high number of positional and equality deletes created >>>>> by Flink tasks writing CDC data to Iceberg tables without the need of >>>>> Spark >>>>> in the infrastructure. >>>>> >>>>> I did some planning, prototyping and currently trying out the solution >>>>> on a larger scale. >>>>> >>>>> I put together a document how my current solution looks like: >>>>> >>>>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp=sharing >>>>> >>>>> I would love to hear your thoughts and feedback on this to find a good >>>>> final solution. >>>>> >>>>> Thanks, >>>>> Peter >>>>> >>>>