> If nobody else has a better idea, then I will add a default JDBC based locking implementation to the PR
Do you mean an implementation of `LockManager` in core or something specific to this Flink application? On Tue, Aug 6, 2024 at 2:28 AM Péter Váry <peter.vary.apa...@gmail.com> wrote: > > > We can make sure that the Tasks can tolerate concurrent runs, but as > mentioned in the doc, in most cases having concurrent runs are a waste of > resources, because of the commit conflicts. > > > > Is the problem that users may configure multiple jobs that are all > trying to run maintenance procedures? If so, isn't this a user-level > problem? If you don't want maintenance job conflicts, only run one > maintenance job. > > There are conflicts even when a single streaming job schedules the tasks. > One example: > - Streaming job writes rows to an append only table, and the Table > Maintenance is scheduled in the PostCommitTopology > - DataFile rewrite is scheduled on X commits - to concatenate small files > - ManifestFile rewrite is scheduled on Y commits - to decrease the number > of manifest files > > DataFile rewrite will create a new manifest file. This means if a DataFile > rewrite task is finished and committed, and there is a concurrent > ManifestFile rewrite then the ManifestFile rewrite will fail. I have played > around with serializing the Maintenance Tasks (resulted in a very ugly/hard > to maintain, but working code). This serialization mitigated some of the > issues, but there was still one remaining problem. If there was a delay in > executing the rewrites (job restart/resource deprivation/longer than > expected Maintenance Task run), sometimes a consecutively scheduled > DataFile/ManifestFile rewrite is already started when the previous > DataFile/ManifestFile rewrite was still working. This again resulted in > failures. > > If we accept that locking is a requirement, then we can have: > - Simpler code for the flow - easy to understand flow > - Simpler code for the tasks - no need to separate messages for the > different runs > - No failures in the logs - problematic when swallowed, making hard to > identify real issues when logged > > > I don't think that an Iceberg solution is a good choice here. > Coordination in a system that does work on Iceberg tables does not need to > rely on locking in the Iceberg tables. It should have a way to coordinate > externally. > [..] > > I agree with Ryan that an Iceberg solution is not a good choice here. > > I agree that we don't need to *rely* on locking in the Iceberg tables. > The `TriggerLockFactory.Lock` interface is specifically designed to allow > the user to choose the prefered type of locking. I was trying to come up > with a solution where the Apache Iceberg users don't need to rely on one > more external system for the Flink Table Maintenance to work. > > I understand that this discussion is very similar to the HadoopCatalog > situation, where we have a hacky "solution" which is working in some cases, > but suboptimal. And I understand if we don't want to support it. > > If nobody else has a better idea, then I will add a default JDBC based > locking implementation to the PR [1]. > > Thanks, > Peter > > [1] https://github.com/apache/iceberg/pull/10484 > > Manu Zhang <owenzhang1...@gmail.com> ezt írta (időpont: 2024. aug. 6., K, > 6:25): > >> Hi Peter, >> >> We rely on Airflow to schedule and coordinate maintenance Spark jobs. I >> agree with Ryan that an Iceberg solution is not a good choice here. >> >> Thanks, >> Manu >> >> On Tue, Aug 6, 2024 at 1:07 AM Ryan Blue <b...@databricks.com.invalid> >> wrote: >> >>> > We can make sure that the Tasks can tolerate concurrent runs, but as >>> mentioned in the doc, in most cases having concurrent runs are a waste of >>> resources, because of the commit conflicts. >>> >>> Is the problem that users may configure multiple jobs that are all >>> trying to run maintenance procedures? If so, isn't this a user-level >>> problem? If you don't want maintenance job conflicts, only run one >>> maintenance job. >>> >>> > If Spark/Kafka/Flink users all need some kind of locking it might >>> worth considering coming up with an Iceberg solution for it. >>> >>> I don't think that an Iceberg solution is a good choice here. >>> Coordination in a system that does work on Iceberg tables does not need to >>> rely on locking in the Iceberg tables. It should have a way to coordinate >>> externally. >>> >>> On Sun, Aug 4, 2024 at 10:29 PM Péter Váry <peter.vary.apa...@gmail.com> >>> wrote: >>> >>>> Thanks everyone for your answers! I really appreciate it, especially >>>> since these come into during the weekend, using your own time. >>>> >>>> @Manu, during our initial discussion, you have mentioned that you had >>>> similar issues with Spark compactions. You needed locking there. Is it >>>> still an issue? >>>> >>>> If Spark/Kafka/Flink users all need some kind of locking it might worth >>>> considering coming up with an Iceberg solution for it. >>>> I see the following possibilities: >>>> 1. Create a locking feature using the current Catalog API: We could >>>> create a 'lock' instead of 'tag', and we could use the Catalogs atomic >>>> change requirement to make locking atomic. My main concern with this >>>> approach is that it relies on the linear history of the table and produces >>>> more contention in write side. I was OK with this in the very specific >>>> use-case with Flink Table Maintenance (few changes, controlled times), but >>>> as a general solution, I would look somewhere else. >>>> 2. Add locking to the Catalog API as a requirement: We could widen the >>>> Catalog API with a lock/unlock optional feature. I'm not sure that we see >>>> enough use cases to merit such a big change. OTOH this could be a good >>>> additional feature to the REST Catalog as well, and maybe something we >>>> would like to add to our Catalog sooner or later. >>>> >>>> About the specific case: if we want to integrate Flink Table >>>> Maintenance to the streaming Flink sinks - and I consider that one of the >>>> main use cases - we can't rely on external schedulers to prevent concurrent >>>> writes. We can make sure that the Tasks can tolerate concurrent runs, but >>>> as mentioned in the doc, in most cases having concurrent runs are a waste >>>> of resources, because of the commit conflicts. >>>> >>>> If we decide to pursue the Flink only solution, I would implement a >>>> JDBC based locking implementation for the LockFactory interface based on >>>> the feedback. This seems like the most natural and available external >>>> storage for locks. Later we could add more implementations based on the >>>> user needs. >>>> >>>> Thanks for the feedback and discussion, >>>> Peter >>>> >>>> On Mon, Aug 5, 2024, 05:18 Manu Zhang <owenzhang1...@gmail.com> wrote: >>>> >>>>> Not familiar with Flink, I'm wondering how Flink resolves concurrency >>>>> issues in common Flink use cases. For example, how does Flink prevent two >>>>> jobs from writing to the same file? >>>>> >>>>> On the other hand, an Iceberg tag is eventually an atomic change to a >>>>> file. It's the same as using a file lock. I don't think we can handle such >>>>> cases without relying on external dependencies. We may define interfaces >>>>> in >>>>> Iceberg and let users choose whatever services they can use to implement >>>>> it. My $0.01. >>>>> >>>>> Manu >>>>> >>>>> >>>>> On Mon, Aug 5, 2024 at 8:38 AM Steven Wu <stevenz...@gmail.com> wrote: >>>>> >>>>>> I also don't feel it is the best fit to use tags to implement locks >>>>>> for passing control messages. This is the main sticking point for me from >>>>>> the design doc. However, we haven't been able to come up with a better >>>>>> solution yet. Maybe we need to go back to the drawing board again. >>>>>> >>>>>> I am also not sure using Kafka topics to send control messages is a >>>>>> good fit either. It would introduce a dependency on Kafka to run Flink >>>>>> maintenance jobs. It works for Kafka connect sink, because that is for >>>>>> Kafka env anyway. >>>>>> >>>>>> >>>>>> On Sun, Aug 4, 2024 at 1:12 PM Ryan Blue <b...@databricks.com.invalid> >>>>>> wrote: >>>>>> >>>>>>> Hi Péter, thanks for bringing this up. >>>>>>> >>>>>>> I don't think using a tag to "lock" a table is a good idea. The doc >>>>>>> calls out that this is necessary "Since Flink doesn’t provide an out of >>>>>>> the >>>>>>> box solution for downstream operators sending feedback to upstream >>>>>>> operators" so this feels like using Iceberg metadata as a side-channel >>>>>>> within a Flink application. That doesn't seem like a good idea to me. >>>>>>> >>>>>>> Why not use a separate Kafka topic to send control messages for this >>>>>>> purpose, like what is done in the Kafka Connect sink? I think that's a >>>>>>> cleaner way to solve the problem if there is not going to be a way to >>>>>>> fix >>>>>>> it in Flink. >>>>>>> >>>>>>> Ryan >>>>>>> >>>>>>> On Wed, Jul 31, 2024 at 7:45 AM Péter Váry < >>>>>>> peter.vary.apa...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Team, >>>>>>>> >>>>>>>> During the discussion around the Flink Table Maintenance [1], [2], >>>>>>>> I have highlighted that one of the main decision points is the way we >>>>>>>> prevent concurrent Maintenance Tasks from happening concurrently. >>>>>>>> >>>>>>>> At that time we did not find better solution than providing an >>>>>>>> interface for locking, and provide a basic implementation which is >>>>>>>> based on >>>>>>>> Iceberg tags [3]: >>>>>>>> >>>>>>>> *"For convenience an Iceberg tag based solution is provided, so no >>>>>>>> external dependencies are needed. On lock creation a tag named >>>>>>>> '__flink_maitenance' is created for the current snapshot, and on lock >>>>>>>> removal this tag is removed. This solution is not ideal, as it creates >>>>>>>> multiple new versions of the table metadata, and mixes infrastructural >>>>>>>> metadata with user metadata. If this is not acceptable then other >>>>>>>> implementations could be created which could use external components >>>>>>>> like >>>>>>>> JDBC/Kafka/ZooKeeper."* >>>>>>>> >>>>>>>> >>>>>>>> We are in the implementation phase, and we agreed with Steven to >>>>>>>> take one final round with the community, to see if anyone has a better >>>>>>>> suggestion, or we could proceed with the originally agreed one. >>>>>>>> >>>>>>>> So if you have a better idea, please share. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Peter >>>>>>>> >>>>>>>> [1] - >>>>>>>> https://lists.apache.org/thread/qjf83v2xj6lxb9sr8z0v9p0979f8wsmf >>>>>>>> [2] - >>>>>>>> https://lists.apache.org/thread/vjf8m5wg840o58yz4y3q35k2mfhbm49l >>>>>>>> [3] - >>>>>>>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit#heading=h.lt9eaimi6zyz >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Ryan Blue >>>>>>> Databricks >>>>>>> >>>>>> >>> >>> -- >>> Ryan Blue >>> Databricks >>> >> -- Ryan Blue Databricks