> DataFile rewrite will create a new manifest file. This means if a DataFile 
> rewrite task is finished and committed, and there is a concurrent 
> ManifestFile rewrite then the ManifestFile rewrite will fail. I have played 
> around with serializing the Maintenance Tasks (resulted in a very ugly/hard 
> to maintain, but working code). This serialization mitigated some of the 
> issues, but there was still one remaining problem. If there was a delay in 
> executing the rewrites (job restart/resource deprivation/longer than expected 
> Maintenance Task run), sometimes a consecutively scheduled 
> DataFile/ManifestFile rewrite is already started when the previous 
> DataFile/ManifestFile rewrite was still working. This again resulted in 
> failures.

If all the maintenance tasks are created from a single Flink job, is it 
possible to simply skip new maintenance task if there’s already running task? 
The running maintenance tasks could be recorded in the JM? 

> On Aug 6, 2024, at 17:27, Péter Váry <peter.vary.apa...@gmail.com> wrote:
> 
> > > We can make sure that the Tasks can tolerate concurrent runs, but as 
> > > mentioned in the doc, in most cases having concurrent runs are a waste of 
> > > resources, because of the commit conflicts.
> >
> > Is the problem that users may configure multiple jobs that are all trying 
> > to run maintenance procedures? If so, isn't this a user-level problem? If 
> > you don't want maintenance job conflicts, only run one maintenance job.
> 
> There are conflicts even when a single streaming job schedules the tasks.
> One example:
> - Streaming job writes rows to an append only table, and the Table 
> Maintenance is scheduled in the PostCommitTopology
> - DataFile rewrite is scheduled on X commits - to concatenate small files 
> - ManifestFile rewrite is scheduled on Y commits - to decrease the number of 
> manifest files
> 
> DataFile rewrite will create a new manifest file. This means if a DataFile 
> rewrite task is finished and committed, and there is a concurrent 
> ManifestFile rewrite then the ManifestFile rewrite will fail. I have played 
> around with serializing the Maintenance Tasks (resulted in a very ugly/hard 
> to maintain, but working code). This serialization mitigated some of the 
> issues, but there was still one remaining problem. If there was a delay in 
> executing the rewrites (job restart/resource deprivation/longer than expected 
> Maintenance Task run), sometimes a consecutively scheduled 
> DataFile/ManifestFile rewrite is already started when the previous 
> DataFile/ManifestFile rewrite was still working. This again resulted in 
> failures.
> 
> If we accept that locking is a requirement, then we can have:
> - Simpler code for the flow - easy to understand flow
> - Simpler code for the tasks - no need to separate messages for the different 
> runs
> - No failures in the logs - problematic when swallowed, making hard to 
> identify real issues when logged
> 
> > I don't think that an Iceberg solution is a good choice here. Coordination 
> > in a system that does work on Iceberg tables does not need to rely on 
> > locking in the Iceberg tables. It should have a way to coordinate 
> > externally.
> [..]
> > I agree with Ryan that an Iceberg solution is not a good choice here.
> 
> I agree that we don't need to rely on locking in the Iceberg tables. The 
> `TriggerLockFactory.Lock` interface is specifically designed to allow the 
> user to choose the prefered type of locking. I was trying to come up with a 
> solution where the Apache Iceberg users don't need to rely on one more 
> external system for the Flink Table Maintenance to work.
> 
> I understand that this discussion is very similar to the HadoopCatalog 
> situation, where we have a hacky "solution" which is working in some cases, 
> but suboptimal. And I understand if we don't want to support it.
> 
> If nobody else has a better idea, then I will add a default JDBC based 
> locking implementation to the PR [1].
> 
> Thanks,
> Peter
> 
> [1] https://github.com/apache/iceberg/pull/10484
> 
> Manu Zhang <owenzhang1...@gmail.com <mailto:owenzhang1...@gmail.com>> ezt 
> írta (időpont: 2024. aug. 6., K, 6:25):
>> Hi Peter,
>> 
>> We rely on Airflow to schedule and coordinate maintenance Spark jobs. I 
>> agree with Ryan that an Iceberg solution is not a good choice here.
>> 
>> Thanks,
>> Manu
>> 
>> On Tue, Aug 6, 2024 at 1:07 AM Ryan Blue <b...@databricks.com.invalid> wrote:
>>> > We can make sure that the Tasks can tolerate concurrent runs, but as 
>>> > mentioned in the doc, in most cases having concurrent runs are a waste of 
>>> > resources, because of the commit conflicts.
>>> 
>>> Is the problem that users may configure multiple jobs that are all trying 
>>> to run maintenance procedures? If so, isn't this a user-level problem? If 
>>> you don't want maintenance job conflicts, only run one maintenance job.
>>> 
>>> > If Spark/Kafka/Flink users all need some kind of locking it might worth 
>>> > considering coming up with an Iceberg solution for it.
>>> 
>>> I don't think that an Iceberg solution is a good choice here. Coordination 
>>> in a system that does work on Iceberg tables does not need to rely on 
>>> locking in the Iceberg tables. It should have a way to coordinate 
>>> externally.
>>> 
>>> On Sun, Aug 4, 2024 at 10:29 PM Péter Váry <peter.vary.apa...@gmail.com 
>>> <mailto:peter.vary.apa...@gmail.com>> wrote:
>>>> Thanks everyone for your answers! I really appreciate it, especially since 
>>>> these come into during the weekend, using your own time.
>>>> 
>>>> @Manu, during our initial discussion, you have mentioned that you had 
>>>> similar issues with Spark compactions. You needed locking there. Is it 
>>>> still an issue?
>>>> 
>>>> If Spark/Kafka/Flink users all need some kind of locking it might worth 
>>>> considering coming up with an Iceberg solution for it.
>>>> I see the following possibilities:
>>>> 1. Create a locking feature using the current Catalog API: We could create 
>>>> a 'lock' instead of 'tag', and we could use the Catalogs atomic change 
>>>> requirement to make locking atomic. My main concern with this approach is 
>>>> that it relies on the linear history of the table and produces more 
>>>> contention in write side. I was OK with this in the very specific use-case 
>>>> with Flink Table Maintenance (few changes, controlled times), but as a 
>>>> general solution, I would look somewhere else.
>>>> 2. Add locking to the Catalog API as a requirement: We could widen the 
>>>> Catalog API with a lock/unlock optional feature. I'm not sure that we see 
>>>> enough use cases to merit such a big change. OTOH this could be a good 
>>>> additional feature to the REST Catalog as well, and maybe something we 
>>>> would like to add to our Catalog sooner or later.
>>>> 
>>>> About the specific case: if we want to integrate Flink Table Maintenance 
>>>> to the streaming Flink sinks - and I consider that one of the main use 
>>>> cases - we can't rely on external schedulers to prevent concurrent writes. 
>>>> We can make sure that the Tasks can tolerate concurrent runs, but as 
>>>> mentioned in the doc, in most cases having concurrent runs are a waste of 
>>>> resources, because of the commit conflicts.
>>>> 
>>>> If we decide to pursue the Flink only solution, I would implement a JDBC 
>>>> based locking implementation for the LockFactory interface based on the 
>>>> feedback. This seems like the most natural and available external storage 
>>>> for locks. Later we could add more implementations based on the user needs.
>>>> 
>>>> Thanks for the feedback and discussion,
>>>> Peter
>>>> 
>>>> On Mon, Aug 5, 2024, 05:18 Manu Zhang <owenzhang1...@gmail.com 
>>>> <mailto:owenzhang1...@gmail.com>> wrote:
>>>>> Not familiar with Flink, I'm wondering how Flink resolves concurrency 
>>>>> issues in common Flink use cases. For example, how does Flink prevent two 
>>>>> jobs from writing to the same file?
>>>>> 
>>>>> On the other hand, an Iceberg tag is eventually an atomic change to a 
>>>>> file. It's the same as using a file lock. I don't think we can handle 
>>>>> such cases without relying on external dependencies. We may define 
>>>>> interfaces in Iceberg and let users choose whatever services they can use 
>>>>> to implement it. My $0.01.
>>>>> 
>>>>> Manu
>>>>> 
>>>>> 
>>>>> On Mon, Aug 5, 2024 at 8:38 AM Steven Wu <stevenz...@gmail.com 
>>>>> <mailto:stevenz...@gmail.com>> wrote:
>>>>>> I also don't feel it is the best fit to use tags to implement locks for 
>>>>>> passing control messages. This is the main sticking point for me from 
>>>>>> the design doc. However, we haven't been able to come up with a better 
>>>>>> solution yet. Maybe we need to go back to the drawing board again.
>>>>>> 
>>>>>> I am also not sure using Kafka topics to send control messages is a good 
>>>>>> fit either. It would introduce a dependency on Kafka to run Flink 
>>>>>> maintenance jobs. It works for Kafka connect sink, because that is for 
>>>>>> Kafka env anyway.
>>>>>> 
>>>>>> 
>>>>>> On Sun, Aug 4, 2024 at 1:12 PM Ryan Blue <b...@databricks.com.invalid> 
>>>>>> wrote:
>>>>>>>  Hi Péter, thanks for bringing this up.
>>>>>>> 
>>>>>>> I don't think using a tag to "lock" a table is a good idea. The doc 
>>>>>>> calls out that this is necessary "Since Flink doesn’t provide an out of 
>>>>>>> the box solution for downstream operators sending feedback to upstream 
>>>>>>> operators" so this feels like using Iceberg metadata as a side-channel 
>>>>>>> within a Flink application. That doesn't seem like a good idea to me.
>>>>>>> 
>>>>>>> Why not use a separate Kafka topic to send control messages for this 
>>>>>>> purpose, like what is done in the Kafka Connect sink? I think that's a 
>>>>>>> cleaner way to solve the problem if there is not going to be a way to 
>>>>>>> fix it in Flink.
>>>>>>> 
>>>>>>> Ryan
>>>>>>> 
>>>>>>> On Wed, Jul 31, 2024 at 7:45 AM Péter Váry <peter.vary.apa...@gmail.com 
>>>>>>> <mailto:peter.vary.apa...@gmail.com>> wrote:
>>>>>>>> Hi Team,
>>>>>>>> 
>>>>>>>> During the discussion around the Flink Table Maintenance [1], [2], I 
>>>>>>>> have highlighted that one of the main decision points is the way we 
>>>>>>>> prevent concurrent Maintenance Tasks from happening concurrently.
>>>>>>>> 
>>>>>>>> At that time we did not find better solution than providing an 
>>>>>>>> interface for locking, and provide a basic implementation which is 
>>>>>>>> based on Iceberg tags [3]:
>>>>>>>> 
>>>>>>>> "For convenience an Iceberg tag based solution is provided, so no 
>>>>>>>> external dependencies are needed. On lock creation a tag named 
>>>>>>>> '__flink_maitenance' is created for the current snapshot, and on lock 
>>>>>>>> removal this tag is removed. This solution is not ideal, as it creates 
>>>>>>>> multiple new versions of the table metadata, and mixes infrastructural 
>>>>>>>> metadata with user metadata. If this is not acceptable then other 
>>>>>>>> implementations could be created which could use external components 
>>>>>>>> like JDBC/Kafka/ZooKeeper."
>>>>>>>> 
>>>>>>>> We are in the implementation phase, and we agreed with Steven to take 
>>>>>>>> one final round with the community, to see if anyone has a better 
>>>>>>>> suggestion, or we could proceed with the originally agreed one.
>>>>>>>> 
>>>>>>>> So if you have a better idea, please share.
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Peter
>>>>>>>> 
>>>>>>>> [1] - https://lists.apache.org/thread/qjf83v2xj6lxb9sr8z0v9p0979f8wsmf
>>>>>>>> [2] - https://lists.apache.org/thread/vjf8m5wg840o58yz4y3q35k2mfhbm49l
>>>>>>>> [3] - 
>>>>>>>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit#heading=h.lt9eaimi6zyz
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> Ryan Blue
>>>>>>> Databricks
>>> 
>>> 
>>> --
>>> Ryan Blue
>>> Databricks

Reply via email to