> DataFile rewrite will create a new manifest file. This means if a DataFile > rewrite task is finished and committed, and there is a concurrent > ManifestFile rewrite then the ManifestFile rewrite will fail. I have played > around with serializing the Maintenance Tasks (resulted in a very ugly/hard > to maintain, but working code). This serialization mitigated some of the > issues, but there was still one remaining problem. If there was a delay in > executing the rewrites (job restart/resource deprivation/longer than expected > Maintenance Task run), sometimes a consecutively scheduled > DataFile/ManifestFile rewrite is already started when the previous > DataFile/ManifestFile rewrite was still working. This again resulted in > failures.
If all the maintenance tasks are created from a single Flink job, is it possible to simply skip new maintenance task if there’s already running task? The running maintenance tasks could be recorded in the JM? > On Aug 6, 2024, at 17:27, Péter Váry <peter.vary.apa...@gmail.com> wrote: > > > > We can make sure that the Tasks can tolerate concurrent runs, but as > > > mentioned in the doc, in most cases having concurrent runs are a waste of > > > resources, because of the commit conflicts. > > > > Is the problem that users may configure multiple jobs that are all trying > > to run maintenance procedures? If so, isn't this a user-level problem? If > > you don't want maintenance job conflicts, only run one maintenance job. > > There are conflicts even when a single streaming job schedules the tasks. > One example: > - Streaming job writes rows to an append only table, and the Table > Maintenance is scheduled in the PostCommitTopology > - DataFile rewrite is scheduled on X commits - to concatenate small files > - ManifestFile rewrite is scheduled on Y commits - to decrease the number of > manifest files > > DataFile rewrite will create a new manifest file. This means if a DataFile > rewrite task is finished and committed, and there is a concurrent > ManifestFile rewrite then the ManifestFile rewrite will fail. I have played > around with serializing the Maintenance Tasks (resulted in a very ugly/hard > to maintain, but working code). This serialization mitigated some of the > issues, but there was still one remaining problem. If there was a delay in > executing the rewrites (job restart/resource deprivation/longer than expected > Maintenance Task run), sometimes a consecutively scheduled > DataFile/ManifestFile rewrite is already started when the previous > DataFile/ManifestFile rewrite was still working. This again resulted in > failures. > > If we accept that locking is a requirement, then we can have: > - Simpler code for the flow - easy to understand flow > - Simpler code for the tasks - no need to separate messages for the different > runs > - No failures in the logs - problematic when swallowed, making hard to > identify real issues when logged > > > I don't think that an Iceberg solution is a good choice here. Coordination > > in a system that does work on Iceberg tables does not need to rely on > > locking in the Iceberg tables. It should have a way to coordinate > > externally. > [..] > > I agree with Ryan that an Iceberg solution is not a good choice here. > > I agree that we don't need to rely on locking in the Iceberg tables. The > `TriggerLockFactory.Lock` interface is specifically designed to allow the > user to choose the prefered type of locking. I was trying to come up with a > solution where the Apache Iceberg users don't need to rely on one more > external system for the Flink Table Maintenance to work. > > I understand that this discussion is very similar to the HadoopCatalog > situation, where we have a hacky "solution" which is working in some cases, > but suboptimal. And I understand if we don't want to support it. > > If nobody else has a better idea, then I will add a default JDBC based > locking implementation to the PR [1]. > > Thanks, > Peter > > [1] https://github.com/apache/iceberg/pull/10484 > > Manu Zhang <owenzhang1...@gmail.com <mailto:owenzhang1...@gmail.com>> ezt > írta (időpont: 2024. aug. 6., K, 6:25): >> Hi Peter, >> >> We rely on Airflow to schedule and coordinate maintenance Spark jobs. I >> agree with Ryan that an Iceberg solution is not a good choice here. >> >> Thanks, >> Manu >> >> On Tue, Aug 6, 2024 at 1:07 AM Ryan Blue <b...@databricks.com.invalid> wrote: >>> > We can make sure that the Tasks can tolerate concurrent runs, but as >>> > mentioned in the doc, in most cases having concurrent runs are a waste of >>> > resources, because of the commit conflicts. >>> >>> Is the problem that users may configure multiple jobs that are all trying >>> to run maintenance procedures? If so, isn't this a user-level problem? If >>> you don't want maintenance job conflicts, only run one maintenance job. >>> >>> > If Spark/Kafka/Flink users all need some kind of locking it might worth >>> > considering coming up with an Iceberg solution for it. >>> >>> I don't think that an Iceberg solution is a good choice here. Coordination >>> in a system that does work on Iceberg tables does not need to rely on >>> locking in the Iceberg tables. It should have a way to coordinate >>> externally. >>> >>> On Sun, Aug 4, 2024 at 10:29 PM Péter Váry <peter.vary.apa...@gmail.com >>> <mailto:peter.vary.apa...@gmail.com>> wrote: >>>> Thanks everyone for your answers! I really appreciate it, especially since >>>> these come into during the weekend, using your own time. >>>> >>>> @Manu, during our initial discussion, you have mentioned that you had >>>> similar issues with Spark compactions. You needed locking there. Is it >>>> still an issue? >>>> >>>> If Spark/Kafka/Flink users all need some kind of locking it might worth >>>> considering coming up with an Iceberg solution for it. >>>> I see the following possibilities: >>>> 1. Create a locking feature using the current Catalog API: We could create >>>> a 'lock' instead of 'tag', and we could use the Catalogs atomic change >>>> requirement to make locking atomic. My main concern with this approach is >>>> that it relies on the linear history of the table and produces more >>>> contention in write side. I was OK with this in the very specific use-case >>>> with Flink Table Maintenance (few changes, controlled times), but as a >>>> general solution, I would look somewhere else. >>>> 2. Add locking to the Catalog API as a requirement: We could widen the >>>> Catalog API with a lock/unlock optional feature. I'm not sure that we see >>>> enough use cases to merit such a big change. OTOH this could be a good >>>> additional feature to the REST Catalog as well, and maybe something we >>>> would like to add to our Catalog sooner or later. >>>> >>>> About the specific case: if we want to integrate Flink Table Maintenance >>>> to the streaming Flink sinks - and I consider that one of the main use >>>> cases - we can't rely on external schedulers to prevent concurrent writes. >>>> We can make sure that the Tasks can tolerate concurrent runs, but as >>>> mentioned in the doc, in most cases having concurrent runs are a waste of >>>> resources, because of the commit conflicts. >>>> >>>> If we decide to pursue the Flink only solution, I would implement a JDBC >>>> based locking implementation for the LockFactory interface based on the >>>> feedback. This seems like the most natural and available external storage >>>> for locks. Later we could add more implementations based on the user needs. >>>> >>>> Thanks for the feedback and discussion, >>>> Peter >>>> >>>> On Mon, Aug 5, 2024, 05:18 Manu Zhang <owenzhang1...@gmail.com >>>> <mailto:owenzhang1...@gmail.com>> wrote: >>>>> Not familiar with Flink, I'm wondering how Flink resolves concurrency >>>>> issues in common Flink use cases. For example, how does Flink prevent two >>>>> jobs from writing to the same file? >>>>> >>>>> On the other hand, an Iceberg tag is eventually an atomic change to a >>>>> file. It's the same as using a file lock. I don't think we can handle >>>>> such cases without relying on external dependencies. We may define >>>>> interfaces in Iceberg and let users choose whatever services they can use >>>>> to implement it. My $0.01. >>>>> >>>>> Manu >>>>> >>>>> >>>>> On Mon, Aug 5, 2024 at 8:38 AM Steven Wu <stevenz...@gmail.com >>>>> <mailto:stevenz...@gmail.com>> wrote: >>>>>> I also don't feel it is the best fit to use tags to implement locks for >>>>>> passing control messages. This is the main sticking point for me from >>>>>> the design doc. However, we haven't been able to come up with a better >>>>>> solution yet. Maybe we need to go back to the drawing board again. >>>>>> >>>>>> I am also not sure using Kafka topics to send control messages is a good >>>>>> fit either. It would introduce a dependency on Kafka to run Flink >>>>>> maintenance jobs. It works for Kafka connect sink, because that is for >>>>>> Kafka env anyway. >>>>>> >>>>>> >>>>>> On Sun, Aug 4, 2024 at 1:12 PM Ryan Blue <b...@databricks.com.invalid> >>>>>> wrote: >>>>>>> Hi Péter, thanks for bringing this up. >>>>>>> >>>>>>> I don't think using a tag to "lock" a table is a good idea. The doc >>>>>>> calls out that this is necessary "Since Flink doesn’t provide an out of >>>>>>> the box solution for downstream operators sending feedback to upstream >>>>>>> operators" so this feels like using Iceberg metadata as a side-channel >>>>>>> within a Flink application. That doesn't seem like a good idea to me. >>>>>>> >>>>>>> Why not use a separate Kafka topic to send control messages for this >>>>>>> purpose, like what is done in the Kafka Connect sink? I think that's a >>>>>>> cleaner way to solve the problem if there is not going to be a way to >>>>>>> fix it in Flink. >>>>>>> >>>>>>> Ryan >>>>>>> >>>>>>> On Wed, Jul 31, 2024 at 7:45 AM Péter Váry <peter.vary.apa...@gmail.com >>>>>>> <mailto:peter.vary.apa...@gmail.com>> wrote: >>>>>>>> Hi Team, >>>>>>>> >>>>>>>> During the discussion around the Flink Table Maintenance [1], [2], I >>>>>>>> have highlighted that one of the main decision points is the way we >>>>>>>> prevent concurrent Maintenance Tasks from happening concurrently. >>>>>>>> >>>>>>>> At that time we did not find better solution than providing an >>>>>>>> interface for locking, and provide a basic implementation which is >>>>>>>> based on Iceberg tags [3]: >>>>>>>> >>>>>>>> "For convenience an Iceberg tag based solution is provided, so no >>>>>>>> external dependencies are needed. On lock creation a tag named >>>>>>>> '__flink_maitenance' is created for the current snapshot, and on lock >>>>>>>> removal this tag is removed. This solution is not ideal, as it creates >>>>>>>> multiple new versions of the table metadata, and mixes infrastructural >>>>>>>> metadata with user metadata. If this is not acceptable then other >>>>>>>> implementations could be created which could use external components >>>>>>>> like JDBC/Kafka/ZooKeeper." >>>>>>>> >>>>>>>> We are in the implementation phase, and we agreed with Steven to take >>>>>>>> one final round with the community, to see if anyone has a better >>>>>>>> suggestion, or we could proceed with the originally agreed one. >>>>>>>> >>>>>>>> So if you have a better idea, please share. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Peter >>>>>>>> >>>>>>>> [1] - https://lists.apache.org/thread/qjf83v2xj6lxb9sr8z0v9p0979f8wsmf >>>>>>>> [2] - https://lists.apache.org/thread/vjf8m5wg840o58yz4y3q35k2mfhbm49l >>>>>>>> [3] - >>>>>>>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit#heading=h.lt9eaimi6zyz >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Ryan Blue >>>>>>> Databricks >>> >>> >>> -- >>> Ryan Blue >>> Databricks