> > We can make sure that the Tasks can tolerate concurrent runs, but as mentioned in the doc, in most cases having concurrent runs are a waste of resources, because of the commit conflicts. > > Is the problem that users may configure multiple jobs that are all trying to run maintenance procedures? If so, isn't this a user-level problem? If you don't want maintenance job conflicts, only run one maintenance job.
There are conflicts even when a single streaming job schedules the tasks. One example: - Streaming job writes rows to an append only table, and the Table Maintenance is scheduled in the PostCommitTopology - DataFile rewrite is scheduled on X commits - to concatenate small files - ManifestFile rewrite is scheduled on Y commits - to decrease the number of manifest files DataFile rewrite will create a new manifest file. This means if a DataFile rewrite task is finished and committed, and there is a concurrent ManifestFile rewrite then the ManifestFile rewrite will fail. I have played around with serializing the Maintenance Tasks (resulted in a very ugly/hard to maintain, but working code). This serialization mitigated some of the issues, but there was still one remaining problem. If there was a delay in executing the rewrites (job restart/resource deprivation/longer than expected Maintenance Task run), sometimes a consecutively scheduled DataFile/ManifestFile rewrite is already started when the previous DataFile/ManifestFile rewrite was still working. This again resulted in failures. If we accept that locking is a requirement, then we can have: - Simpler code for the flow - easy to understand flow - Simpler code for the tasks - no need to separate messages for the different runs - No failures in the logs - problematic when swallowed, making hard to identify real issues when logged > I don't think that an Iceberg solution is a good choice here. Coordination in a system that does work on Iceberg tables does not need to rely on locking in the Iceberg tables. It should have a way to coordinate externally. [..] > I agree with Ryan that an Iceberg solution is not a good choice here. I agree that we don't need to *rely* on locking in the Iceberg tables. The `TriggerLockFactory.Lock` interface is specifically designed to allow the user to choose the prefered type of locking. I was trying to come up with a solution where the Apache Iceberg users don't need to rely on one more external system for the Flink Table Maintenance to work. I understand that this discussion is very similar to the HadoopCatalog situation, where we have a hacky "solution" which is working in some cases, but suboptimal. And I understand if we don't want to support it. If nobody else has a better idea, then I will add a default JDBC based locking implementation to the PR [1]. Thanks, Peter [1] https://github.com/apache/iceberg/pull/10484 Manu Zhang <owenzhang1...@gmail.com> ezt írta (időpont: 2024. aug. 6., K, 6:25): > Hi Peter, > > We rely on Airflow to schedule and coordinate maintenance Spark jobs. I > agree with Ryan that an Iceberg solution is not a good choice here. > > Thanks, > Manu > > On Tue, Aug 6, 2024 at 1:07 AM Ryan Blue <b...@databricks.com.invalid> > wrote: > >> > We can make sure that the Tasks can tolerate concurrent runs, but as >> mentioned in the doc, in most cases having concurrent runs are a waste of >> resources, because of the commit conflicts. >> >> Is the problem that users may configure multiple jobs that are all trying >> to run maintenance procedures? If so, isn't this a user-level problem? If >> you don't want maintenance job conflicts, only run one maintenance job. >> >> > If Spark/Kafka/Flink users all need some kind of locking it might worth >> considering coming up with an Iceberg solution for it. >> >> I don't think that an Iceberg solution is a good choice here. >> Coordination in a system that does work on Iceberg tables does not need to >> rely on locking in the Iceberg tables. It should have a way to coordinate >> externally. >> >> On Sun, Aug 4, 2024 at 10:29 PM Péter Váry <peter.vary.apa...@gmail.com> >> wrote: >> >>> Thanks everyone for your answers! I really appreciate it, especially >>> since these come into during the weekend, using your own time. >>> >>> @Manu, during our initial discussion, you have mentioned that you had >>> similar issues with Spark compactions. You needed locking there. Is it >>> still an issue? >>> >>> If Spark/Kafka/Flink users all need some kind of locking it might worth >>> considering coming up with an Iceberg solution for it. >>> I see the following possibilities: >>> 1. Create a locking feature using the current Catalog API: We could >>> create a 'lock' instead of 'tag', and we could use the Catalogs atomic >>> change requirement to make locking atomic. My main concern with this >>> approach is that it relies on the linear history of the table and produces >>> more contention in write side. I was OK with this in the very specific >>> use-case with Flink Table Maintenance (few changes, controlled times), but >>> as a general solution, I would look somewhere else. >>> 2. Add locking to the Catalog API as a requirement: We could widen the >>> Catalog API with a lock/unlock optional feature. I'm not sure that we see >>> enough use cases to merit such a big change. OTOH this could be a good >>> additional feature to the REST Catalog as well, and maybe something we >>> would like to add to our Catalog sooner or later. >>> >>> About the specific case: if we want to integrate Flink Table Maintenance >>> to the streaming Flink sinks - and I consider that one of the main use >>> cases - we can't rely on external schedulers to prevent concurrent writes. >>> We can make sure that the Tasks can tolerate concurrent runs, but as >>> mentioned in the doc, in most cases having concurrent runs are a waste of >>> resources, because of the commit conflicts. >>> >>> If we decide to pursue the Flink only solution, I would implement a JDBC >>> based locking implementation for the LockFactory interface based on the >>> feedback. This seems like the most natural and available external storage >>> for locks. Later we could add more implementations based on the user needs. >>> >>> Thanks for the feedback and discussion, >>> Peter >>> >>> On Mon, Aug 5, 2024, 05:18 Manu Zhang <owenzhang1...@gmail.com> wrote: >>> >>>> Not familiar with Flink, I'm wondering how Flink resolves concurrency >>>> issues in common Flink use cases. For example, how does Flink prevent two >>>> jobs from writing to the same file? >>>> >>>> On the other hand, an Iceberg tag is eventually an atomic change to a >>>> file. It's the same as using a file lock. I don't think we can handle such >>>> cases without relying on external dependencies. We may define interfaces in >>>> Iceberg and let users choose whatever services they can use to implement >>>> it. My $0.01. >>>> >>>> Manu >>>> >>>> >>>> On Mon, Aug 5, 2024 at 8:38 AM Steven Wu <stevenz...@gmail.com> wrote: >>>> >>>>> I also don't feel it is the best fit to use tags to implement locks >>>>> for passing control messages. This is the main sticking point for me from >>>>> the design doc. However, we haven't been able to come up with a better >>>>> solution yet. Maybe we need to go back to the drawing board again. >>>>> >>>>> I am also not sure using Kafka topics to send control messages is a >>>>> good fit either. It would introduce a dependency on Kafka to run Flink >>>>> maintenance jobs. It works for Kafka connect sink, because that is for >>>>> Kafka env anyway. >>>>> >>>>> >>>>> On Sun, Aug 4, 2024 at 1:12 PM Ryan Blue <b...@databricks.com.invalid> >>>>> wrote: >>>>> >>>>>> Hi Péter, thanks for bringing this up. >>>>>> >>>>>> I don't think using a tag to "lock" a table is a good idea. The doc >>>>>> calls out that this is necessary "Since Flink doesn’t provide an out of >>>>>> the >>>>>> box solution for downstream operators sending feedback to upstream >>>>>> operators" so this feels like using Iceberg metadata as a side-channel >>>>>> within a Flink application. That doesn't seem like a good idea to me. >>>>>> >>>>>> Why not use a separate Kafka topic to send control messages for this >>>>>> purpose, like what is done in the Kafka Connect sink? I think that's a >>>>>> cleaner way to solve the problem if there is not going to be a way to fix >>>>>> it in Flink. >>>>>> >>>>>> Ryan >>>>>> >>>>>> On Wed, Jul 31, 2024 at 7:45 AM Péter Váry < >>>>>> peter.vary.apa...@gmail.com> wrote: >>>>>> >>>>>>> Hi Team, >>>>>>> >>>>>>> During the discussion around the Flink Table Maintenance [1], [2], I >>>>>>> have highlighted that one of the main decision points is the way we >>>>>>> prevent >>>>>>> concurrent Maintenance Tasks from happening concurrently. >>>>>>> >>>>>>> At that time we did not find better solution than providing an >>>>>>> interface for locking, and provide a basic implementation which is >>>>>>> based on >>>>>>> Iceberg tags [3]: >>>>>>> >>>>>>> *"For convenience an Iceberg tag based solution is provided, so no >>>>>>> external dependencies are needed. On lock creation a tag named >>>>>>> '__flink_maitenance' is created for the current snapshot, and on lock >>>>>>> removal this tag is removed. This solution is not ideal, as it creates >>>>>>> multiple new versions of the table metadata, and mixes infrastructural >>>>>>> metadata with user metadata. If this is not acceptable then other >>>>>>> implementations could be created which could use external components >>>>>>> like >>>>>>> JDBC/Kafka/ZooKeeper."* >>>>>>> >>>>>>> >>>>>>> We are in the implementation phase, and we agreed with Steven to >>>>>>> take one final round with the community, to see if anyone has a better >>>>>>> suggestion, or we could proceed with the originally agreed one. >>>>>>> >>>>>>> So if you have a better idea, please share. >>>>>>> >>>>>>> Thanks, >>>>>>> Peter >>>>>>> >>>>>>> [1] - >>>>>>> https://lists.apache.org/thread/qjf83v2xj6lxb9sr8z0v9p0979f8wsmf >>>>>>> [2] - >>>>>>> https://lists.apache.org/thread/vjf8m5wg840o58yz4y3q35k2mfhbm49l >>>>>>> [3] - >>>>>>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit#heading=h.lt9eaimi6zyz >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Ryan Blue >>>>>> Databricks >>>>>> >>>>> >> >> -- >> Ryan Blue >> Databricks >> >