Thanks for the proposal Peter. I just wanted to know do we have any plans for supporting SQL syntax for table maintenance (like CALL procedure) for pure Flink SQL users? I didn't see any custom SQL parser plugin support in Flink. I also saw that Branch write doesn't have SQL support (only Branch reads use Option), So I am not sure about the roadmap of Iceberg SQL support in Flink. Was there any discussion before?
- Ajantha On Mon, Apr 1, 2024 at 7:51 PM Péter Váry <peter.vary.apa...@gmail.com> wrote: > Hi Manu, > > Just to clarify: > - Are you proposing to create a user facing locking feature in Iceberg, or > just something something for internal use? > > I think we shouldn't add locking to Iceberg's user facing scope in this > stage. A fully featured locking system has many more features that we need > (priorities, fairness, timeouts etc). I could be tempted when we are > talking about the REST catalog, but I think that should be further down the > road, if ever... > > About using the tags: > - I whole-heartedly agree that using tags is not intuitive, and I see your > points in most of your arguments. OTOH, introducing new requirement > (locking mechanism) seems like a wrong direction to me. > - We already defined a requirement (atomic changes on the table) for the > Catalog implementations which could be used to archive our goal here. > - We also already store technical data in snapshot properties in Flink > jobs (JobId, OperatorId, CheckpointId). Maybe technical tags/table > properties is not a big stretch. > > Or we can look at these tags or metadata as 'technical data' which is > internal to Iceberg, and shouldn't expressed on the external API. My > concern is: > - Would it be used often enough to worth the additional complexity? > > Knowing that Spark compaction is struggling with the same issue is a good > indicator, but probably we would need more use cases for introducing a new > feature with this complexity, or simpler solution. > > Thanks, Peter > > > On Mon, Apr 1, 2024, 10:18 Manu Zhang <owenzhang1...@gmail.com> wrote: > >> What would the community think of exploiting tags for preventing >>> concurrent maintenance loop executions. >> >> >> This issue is not specific to Flink maintenance jobs. We have a service >> scheduling Spark maintenance jobs by watching table commits. When we don't >> check in-progress maintenance jobs for the same table, multiple jobs will >> run concurrently and have conflicts. >> >> Basically, I think we need a lock mechanism like the metastore lock >> <https://iceberg.apache.org/docs/nightly/configuration/#hadoop-configuration> >> if we want to handle it for users. However, using TAG doesn't look >> intuitive to me. We are also mixing user data with system metadata. >> Maybe we can define some general interfaces and leave the implementation >> to users in the first version. >> >> Regards, >> Manu >> >> >> >> On Fri, Mar 29, 2024 at 1:59 PM Péter Váry <peter.vary.apa...@gmail.com> >> wrote: >> >>> What would the community think of exploiting tags for preventing >>> concurrent maintenance loop executions. >>> >>> The issue: >>> Some maintenance tasks couldn't run parallel, like DeleteOrphanFiles vs. >>> ExpireSnapshots, or RewriteDataFiles vs. RewriteManifestFiles. We make >>> sure, not to run tasks started by a single trigger concurrently by >>> serializing them, but there are no loops in Flink, so we can't synchronize >>> tasks started by the next trigger. >>> >>> In the document, I describe that we need to rely on the user to ensure >>> that the rate limit is high enough to prevent concurrent triggers. >>> >>> Proposal: >>> When firing a trigger, RateLimiter could check and create an Iceberg >>> table tag [1] for the current table snapshot, with the name: >>> '__flink_maitenance'. When the execution finishes we remove this tag. The >>> RateLimiter keep accumulating changes, and doesn't fire new triggers until >>> it finds this tag on the table. >>> The solution relies on Flink 'forceNonParallel' to prevent concurrent >>> execution of placing the tag, and on Iceberg to store it. >>> >>> This not uses the tags as intended, but seems like a better solution >>> than adding/removing table properties which would clutter the table history >>> with technical data. >>> >>> Your thoughts? Any other suggestions/solutions would be welcome. >>> >>> Thanks, >>> Peter >>> >>> [1] >>> https://iceberg.apache.org/docs/latest/java-api-quickstart/#branching-and-tagging >>> >>> On Thu, Mar 28, 2024, 14:44 Péter Váry <peter.vary.apa...@gmail.com> >>> wrote: >>> >>>> Hi Team, >>>> >>>> As discussed on yesterday's community sync, I am working on adding a >>>> possibility to the Flink Iceberg connector to run maintenance tasks on the >>>> Iceberg tables. This will fix the small files issues and in the long run >>>> help compacting the high number of positional and equality deletes created >>>> by Flink tasks writing CDC data to Iceberg tables without the need of Spark >>>> in the infrastructure. >>>> >>>> I did some planning, prototyping and currently trying out the solution >>>> on a larger scale. >>>> >>>> I put together a document how my current solution looks like: >>>> >>>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp=sharing >>>> >>>> I would love to hear your thoughts and feedback on this to find a good >>>> final solution. >>>> >>>> Thanks, >>>> Peter >>>> >>>