If this is specific to solving the problem that there is no notification when a task finishes in Flink, then I think it makes sense to use a JDBC lock. I'd prefer that this not add the tag-based locking strategy because I think that has the potential to be misunderstood by people using the library and misused.
On Wed, Aug 7, 2024 at 1:54 AM Péter Váry <peter.vary.apa...@gmail.com> wrote: > Hi Anton, nice to hear from you! > Thanks Ryan for your continued interest! > > You can find my answers below: > > > Am I right to say the proposal has the following high-level goals: > > - Perform cheap maintenance actions periodically after commits using the > same cluster (suitable for things like rewriting manifests, compacting tiny > data files). > > Yes > > > - Offer an ability to run a Flink monitor service that would listen to > table changes and trigger appropriate actions (suitable for more expensive > operations like merging equality deletes into data files, removing orphan > files) > > I would rephrase this a bit differently: Offer an ability to create a > Flink Table Maintenance service which will listen to the table changes and > trigger and execute the appropriate actions. I think the main difference > here is that the whole monitoring/scheduling/executing is coupled into a > single job. > > > - Support a way to prevent running duplicated and conflicting > maintenance actions. > > This is part of both of the previous goals. For example: > - Even cheap maintenance actions could conflict, as rewriting manifests > and compacting tiny data files are conflicting > - Big maintenance actions could take a long time, and they could easily > end up in a situation when the next action is scheduled, but would conflict > with the already running one > > > If my understanding is correct, can we elaborate on the exact use cases > when we will schedule duplicated and conflicting maintenance actions? Are > we talking about conflicts between data operations and maintenance or > between different maintenance actions? > > Here, we do not talk about conflicts between data operations and > maintenance tasks. The goal here is to prevent different, conflicting > maintenance tasks running concurrently. > One example: > - Cheap maintenance is enabled on the Flink Sink > - Compacting tiny files are started > - Rewriting manifest files is scheduled - but should not start until the > tiny file compaction is finished. If the tiny file compaction is finished > first, then the manifest rewrite will not be able to commit, as the current > list of the manifest files are changed. > > Another example: > - Big maintenance job is running > - Delete Orphan is started > - Expire Snapshot is scheduled - but should not start until the Delete > Orphan files task is finished. If we start the Expire Snapshots then we > might remove metadata files which are still read by the Delete Orphan files. > > > > If nobody else has a better idea, then I will add a default JDBC based > locking implementation to the PR > > Do you mean an implementation of `LockManager` in core or something > specific to this Flink application? > > This would be a Flink-only interface. TriggerLockFactory and > TriggerLockFactory.Lock [1]. See the proposed JDBC implementation PR [2] > If other engines want to use it then we could move it to the iceberg-core, > but I don't see the need for it ATM. > > Thanks, > Peter > > [1] - > https://github.com/apache/iceberg/pull/10484/files#diff-06332fe8ce41f0708e6ab59484ea88dfb86ff5eaa35c9a4e76110e9fb1a290ca > [2] - > https://github.com/apache/iceberg/pull/10484/files#diff-5aa5681994bb389833582619220b5f339fa416b60b4535ce7492c5fb32fc417d > > Ryan Blue <b...@databricks.com.invalid> ezt írta (időpont: 2024. aug. 6., > K, 22:37): > >> > If nobody else has a better idea, then I will add a default JDBC based >> locking implementation to the PR >> >> Do you mean an implementation of `LockManager` in core or something >> specific to this Flink application? >> >> On Tue, Aug 6, 2024 at 2:28 AM Péter Váry <peter.vary.apa...@gmail.com> >> wrote: >> >>> > > We can make sure that the Tasks can tolerate concurrent runs, but as >>> mentioned in the doc, in most cases having concurrent runs are a waste of >>> resources, because of the commit conflicts. >>> > >>> > Is the problem that users may configure multiple jobs that are all >>> trying to run maintenance procedures? If so, isn't this a user-level >>> problem? If you don't want maintenance job conflicts, only run one >>> maintenance job. >>> >>> There are conflicts even when a single streaming job schedules the tasks. >>> One example: >>> - Streaming job writes rows to an append only table, and the Table >>> Maintenance is scheduled in the PostCommitTopology >>> - DataFile rewrite is scheduled on X commits - to concatenate small >>> files >>> - ManifestFile rewrite is scheduled on Y commits - to decrease the >>> number of manifest files >>> >>> DataFile rewrite will create a new manifest file. This means if a >>> DataFile rewrite task is finished and committed, and there is a concurrent >>> ManifestFile rewrite then the ManifestFile rewrite will fail. I have played >>> around with serializing the Maintenance Tasks (resulted in a very ugly/hard >>> to maintain, but working code). This serialization mitigated some of the >>> issues, but there was still one remaining problem. If there was a delay in >>> executing the rewrites (job restart/resource deprivation/longer than >>> expected Maintenance Task run), sometimes a consecutively scheduled >>> DataFile/ManifestFile rewrite is already started when the previous >>> DataFile/ManifestFile rewrite was still working. This again resulted in >>> failures. >>> >>> If we accept that locking is a requirement, then we can have: >>> - Simpler code for the flow - easy to understand flow >>> - Simpler code for the tasks - no need to separate messages for the >>> different runs >>> - No failures in the logs - problematic when swallowed, making hard to >>> identify real issues when logged >>> >>> > I don't think that an Iceberg solution is a good choice here. >>> Coordination in a system that does work on Iceberg tables does not need to >>> rely on locking in the Iceberg tables. It should have a way to coordinate >>> externally. >>> [..] >>> > I agree with Ryan that an Iceberg solution is not a good choice here. >>> >>> I agree that we don't need to *rely* on locking in the Iceberg tables. >>> The `TriggerLockFactory.Lock` interface is specifically designed to allow >>> the user to choose the prefered type of locking. I was trying to come up >>> with a solution where the Apache Iceberg users don't need to rely on one >>> more external system for the Flink Table Maintenance to work. >>> >>> I understand that this discussion is very similar to the HadoopCatalog >>> situation, where we have a hacky "solution" which is working in some cases, >>> but suboptimal. And I understand if we don't want to support it. >>> >>> If nobody else has a better idea, then I will add a default JDBC based >>> locking implementation to the PR [1]. >>> >>> Thanks, >>> Peter >>> >>> [1] https://github.com/apache/iceberg/pull/10484 >>> >>> Manu Zhang <owenzhang1...@gmail.com> ezt írta (időpont: 2024. aug. 6., >>> K, 6:25): >>> >>>> Hi Peter, >>>> >>>> We rely on Airflow to schedule and coordinate maintenance Spark jobs. I >>>> agree with Ryan that an Iceberg solution is not a good choice here. >>>> >>>> Thanks, >>>> Manu >>>> >>>> On Tue, Aug 6, 2024 at 1:07 AM Ryan Blue <b...@databricks.com.invalid> >>>> wrote: >>>> >>>>> > We can make sure that the Tasks can tolerate concurrent runs, but as >>>>> mentioned in the doc, in most cases having concurrent runs are a waste of >>>>> resources, because of the commit conflicts. >>>>> >>>>> Is the problem that users may configure multiple jobs that are all >>>>> trying to run maintenance procedures? If so, isn't this a user-level >>>>> problem? If you don't want maintenance job conflicts, only run one >>>>> maintenance job. >>>>> >>>>> > If Spark/Kafka/Flink users all need some kind of locking it might >>>>> worth considering coming up with an Iceberg solution for it. >>>>> >>>>> I don't think that an Iceberg solution is a good choice here. >>>>> Coordination in a system that does work on Iceberg tables does not need to >>>>> rely on locking in the Iceberg tables. It should have a way to coordinate >>>>> externally. >>>>> >>>>> On Sun, Aug 4, 2024 at 10:29 PM Péter Váry < >>>>> peter.vary.apa...@gmail.com> wrote: >>>>> >>>>>> Thanks everyone for your answers! I really appreciate it, especially >>>>>> since these come into during the weekend, using your own time. >>>>>> >>>>>> @Manu, during our initial discussion, you have mentioned that you had >>>>>> similar issues with Spark compactions. You needed locking there. Is it >>>>>> still an issue? >>>>>> >>>>>> If Spark/Kafka/Flink users all need some kind of locking it might >>>>>> worth considering coming up with an Iceberg solution for it. >>>>>> I see the following possibilities: >>>>>> 1. Create a locking feature using the current Catalog API: We could >>>>>> create a 'lock' instead of 'tag', and we could use the Catalogs atomic >>>>>> change requirement to make locking atomic. My main concern with this >>>>>> approach is that it relies on the linear history of the table and >>>>>> produces >>>>>> more contention in write side. I was OK with this in the very specific >>>>>> use-case with Flink Table Maintenance (few changes, controlled times), >>>>>> but >>>>>> as a general solution, I would look somewhere else. >>>>>> 2. Add locking to the Catalog API as a requirement: We could widen >>>>>> the Catalog API with a lock/unlock optional feature. I'm not sure that we >>>>>> see enough use cases to merit such a big change. OTOH this could be a >>>>>> good >>>>>> additional feature to the REST Catalog as well, and maybe something we >>>>>> would like to add to our Catalog sooner or later. >>>>>> >>>>>> About the specific case: if we want to integrate Flink Table >>>>>> Maintenance to the streaming Flink sinks - and I consider that one of the >>>>>> main use cases - we can't rely on external schedulers to prevent >>>>>> concurrent >>>>>> writes. We can make sure that the Tasks can tolerate concurrent runs, but >>>>>> as mentioned in the doc, in most cases having concurrent runs are a waste >>>>>> of resources, because of the commit conflicts. >>>>>> >>>>>> If we decide to pursue the Flink only solution, I would implement a >>>>>> JDBC based locking implementation for the LockFactory interface based on >>>>>> the feedback. This seems like the most natural and available external >>>>>> storage for locks. Later we could add more implementations based on the >>>>>> user needs. >>>>>> >>>>>> Thanks for the feedback and discussion, >>>>>> Peter >>>>>> >>>>>> On Mon, Aug 5, 2024, 05:18 Manu Zhang <owenzhang1...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Not familiar with Flink, I'm wondering how Flink resolves >>>>>>> concurrency issues in common Flink use cases. For example, how does >>>>>>> Flink >>>>>>> prevent two jobs from writing to the same file? >>>>>>> >>>>>>> On the other hand, an Iceberg tag is eventually an atomic change to >>>>>>> a file. It's the same as using a file lock. I don't think we can handle >>>>>>> such cases without relying on external dependencies. We may define >>>>>>> interfaces in Iceberg and let users choose whatever services they can >>>>>>> use >>>>>>> to implement it. My $0.01. >>>>>>> >>>>>>> Manu >>>>>>> >>>>>>> >>>>>>> On Mon, Aug 5, 2024 at 8:38 AM Steven Wu <stevenz...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> I also don't feel it is the best fit to use tags to implement locks >>>>>>>> for passing control messages. This is the main sticking point for me >>>>>>>> from >>>>>>>> the design doc. However, we haven't been able to come up with a better >>>>>>>> solution yet. Maybe we need to go back to the drawing board again. >>>>>>>> >>>>>>>> I am also not sure using Kafka topics to send control messages is a >>>>>>>> good fit either. It would introduce a dependency on Kafka to run Flink >>>>>>>> maintenance jobs. It works for Kafka connect sink, because that is for >>>>>>>> Kafka env anyway. >>>>>>>> >>>>>>>> >>>>>>>> On Sun, Aug 4, 2024 at 1:12 PM Ryan Blue >>>>>>>> <b...@databricks.com.invalid> wrote: >>>>>>>> >>>>>>>>> Hi Péter, thanks for bringing this up. >>>>>>>>> >>>>>>>>> I don't think using a tag to "lock" a table is a good idea. The >>>>>>>>> doc calls out that this is necessary "Since Flink doesn’t provide an >>>>>>>>> out of >>>>>>>>> the box solution for downstream operators sending feedback to upstream >>>>>>>>> operators" so this feels like using Iceberg metadata as a side-channel >>>>>>>>> within a Flink application. That doesn't seem like a good idea to me. >>>>>>>>> >>>>>>>>> Why not use a separate Kafka topic to send control messages for >>>>>>>>> this purpose, like what is done in the Kafka Connect sink? I think >>>>>>>>> that's a >>>>>>>>> cleaner way to solve the problem if there is not going to be a way to >>>>>>>>> fix >>>>>>>>> it in Flink. >>>>>>>>> >>>>>>>>> Ryan >>>>>>>>> >>>>>>>>> On Wed, Jul 31, 2024 at 7:45 AM Péter Váry < >>>>>>>>> peter.vary.apa...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi Team, >>>>>>>>>> >>>>>>>>>> During the discussion around the Flink Table Maintenance [1], >>>>>>>>>> [2], I have highlighted that one of the main decision points is the >>>>>>>>>> way we >>>>>>>>>> prevent concurrent Maintenance Tasks from happening concurrently. >>>>>>>>>> >>>>>>>>>> At that time we did not find better solution than providing an >>>>>>>>>> interface for locking, and provide a basic implementation which is >>>>>>>>>> based on >>>>>>>>>> Iceberg tags [3]: >>>>>>>>>> >>>>>>>>>> *"For convenience an Iceberg tag based solution is provided, so >>>>>>>>>> no external dependencies are needed. On lock creation a tag named >>>>>>>>>> '__flink_maitenance' is created for the current snapshot, and on lock >>>>>>>>>> removal this tag is removed. This solution is not ideal, as it >>>>>>>>>> creates >>>>>>>>>> multiple new versions of the table metadata, and mixes >>>>>>>>>> infrastructural >>>>>>>>>> metadata with user metadata. If this is not acceptable then other >>>>>>>>>> implementations could be created which could use external components >>>>>>>>>> like >>>>>>>>>> JDBC/Kafka/ZooKeeper."* >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> We are in the implementation phase, and we agreed with Steven to >>>>>>>>>> take one final round with the community, to see if anyone has a >>>>>>>>>> better >>>>>>>>>> suggestion, or we could proceed with the originally agreed one. >>>>>>>>>> >>>>>>>>>> So if you have a better idea, please share. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Peter >>>>>>>>>> >>>>>>>>>> [1] - >>>>>>>>>> https://lists.apache.org/thread/qjf83v2xj6lxb9sr8z0v9p0979f8wsmf >>>>>>>>>> [2] - >>>>>>>>>> https://lists.apache.org/thread/vjf8m5wg840o58yz4y3q35k2mfhbm49l >>>>>>>>>> [3] - >>>>>>>>>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit#heading=h.lt9eaimi6zyz >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Ryan Blue >>>>>>>>> Databricks >>>>>>>>> >>>>>>>> >>>>> >>>>> -- >>>>> Ryan Blue >>>>> Databricks >>>>> >>>> >> >> -- >> Ryan Blue >> Databricks >> > -- Ryan Blue Databricks