> If nobody else has a better idea, then I will add a default JDBC based
locking implementation to the PR

Do you mean an implementation of `LockManager` in core or something
specific to this Flink application?

On Tue, Aug 6, 2024 at 2:28 AM Péter Váry <peter.vary.apa...@gmail.com>
wrote:

> > > We can make sure that the Tasks can tolerate concurrent runs, but as
> mentioned in the doc, in most cases having concurrent runs are a waste of
> resources, because of the commit conflicts.
> >
> > Is the problem that users may configure multiple jobs that are all
> trying to run maintenance procedures? If so, isn't this a user-level
> problem? If you don't want maintenance job conflicts, only run one
> maintenance job.
>
> There are conflicts even when a single streaming job schedules the tasks.
> One example:
> - Streaming job writes rows to an append only table, and the Table
> Maintenance is scheduled in the PostCommitTopology
> - DataFile rewrite is scheduled on X commits - to concatenate small files
> - ManifestFile rewrite is scheduled on Y commits - to decrease the number
> of manifest files
>
> DataFile rewrite will create a new manifest file. This means if a DataFile
> rewrite task is finished and committed, and there is a concurrent
> ManifestFile rewrite then the ManifestFile rewrite will fail. I have played
> around with serializing the Maintenance Tasks (resulted in a very ugly/hard
> to maintain, but working code). This serialization mitigated some of the
> issues, but there was still one remaining problem. If there was a delay in
> executing the rewrites (job restart/resource deprivation/longer than
> expected Maintenance Task run), sometimes a consecutively scheduled
> DataFile/ManifestFile rewrite is already started when the previous
> DataFile/ManifestFile rewrite was still working. This again resulted in
> failures.
>
> If we accept that locking is a requirement, then we can have:
> - Simpler code for the flow - easy to understand flow
> - Simpler code for the tasks - no need to separate messages for the
> different runs
> - No failures in the logs - problematic when swallowed, making hard to
> identify real issues when logged
>
> > I don't think that an Iceberg solution is a good choice here.
> Coordination in a system that does work on Iceberg tables does not need to
> rely on locking in the Iceberg tables. It should have a way to coordinate
> externally.
> [..]
> > I agree with Ryan that an Iceberg solution is not a good choice here.
>
> I agree that we don't need to *rely* on locking in the Iceberg tables.
> The `TriggerLockFactory.Lock` interface is specifically designed to allow
> the user to choose the prefered type of locking. I was trying to come up
> with a solution where the Apache Iceberg users don't need to rely on one
> more external system for the Flink Table Maintenance to work.
>
> I understand that this discussion is very similar to the HadoopCatalog
> situation, where we have a hacky "solution" which is working in some cases,
> but suboptimal. And I understand if we don't want to support it.
>
> If nobody else has a better idea, then I will add a default JDBC based
> locking implementation to the PR [1].
>
> Thanks,
> Peter
>
> [1] https://github.com/apache/iceberg/pull/10484
>
> Manu Zhang <owenzhang1...@gmail.com> ezt írta (időpont: 2024. aug. 6., K,
> 6:25):
>
>> Hi Peter,
>>
>> We rely on Airflow to schedule and coordinate maintenance Spark jobs. I
>> agree with Ryan that an Iceberg solution is not a good choice here.
>>
>> Thanks,
>> Manu
>>
>> On Tue, Aug 6, 2024 at 1:07 AM Ryan Blue <b...@databricks.com.invalid>
>> wrote:
>>
>>> > We can make sure that the Tasks can tolerate concurrent runs, but as
>>> mentioned in the doc, in most cases having concurrent runs are a waste of
>>> resources, because of the commit conflicts.
>>>
>>> Is the problem that users may configure multiple jobs that are all
>>> trying to run maintenance procedures? If so, isn't this a user-level
>>> problem? If you don't want maintenance job conflicts, only run one
>>> maintenance job.
>>>
>>> > If Spark/Kafka/Flink users all need some kind of locking it might
>>> worth considering coming up with an Iceberg solution for it.
>>>
>>> I don't think that an Iceberg solution is a good choice here.
>>> Coordination in a system that does work on Iceberg tables does not need to
>>> rely on locking in the Iceberg tables. It should have a way to coordinate
>>> externally.
>>>
>>> On Sun, Aug 4, 2024 at 10:29 PM Péter Váry <peter.vary.apa...@gmail.com>
>>> wrote:
>>>
>>>> Thanks everyone for your answers! I really appreciate it, especially
>>>> since these come into during the weekend, using your own time.
>>>>
>>>> @Manu, during our initial discussion, you have mentioned that you had
>>>> similar issues with Spark compactions. You needed locking there. Is it
>>>> still an issue?
>>>>
>>>> If Spark/Kafka/Flink users all need some kind of locking it might worth
>>>> considering coming up with an Iceberg solution for it.
>>>> I see the following possibilities:
>>>> 1. Create a locking feature using the current Catalog API: We could
>>>> create a 'lock' instead of 'tag', and we could use the Catalogs atomic
>>>> change requirement to make locking atomic. My main concern with this
>>>> approach is that it relies on the linear history of the table and produces
>>>> more contention in write side. I was OK with this in the very specific
>>>> use-case with Flink Table Maintenance (few changes, controlled times), but
>>>> as a general solution, I would look somewhere else.
>>>> 2. Add locking to the Catalog API as a requirement: We could widen the
>>>> Catalog API with a lock/unlock optional feature. I'm not sure that we see
>>>> enough use cases to merit such a big change. OTOH this could be a good
>>>> additional feature to the REST Catalog as well, and maybe something we
>>>> would like to add to our Catalog sooner or later.
>>>>
>>>> About the specific case: if we want to integrate Flink Table
>>>> Maintenance to the streaming Flink sinks - and I consider that one of the
>>>> main use cases - we can't rely on external schedulers to prevent concurrent
>>>> writes. We can make sure that the Tasks can tolerate concurrent runs, but
>>>> as mentioned in the doc, in most cases having concurrent runs are a waste
>>>> of resources, because of the commit conflicts.
>>>>
>>>> If we decide to pursue the Flink only solution, I would implement a
>>>> JDBC based locking implementation for the LockFactory interface based on
>>>> the feedback. This seems like the most natural and available external
>>>> storage for locks. Later we could add more implementations based on the
>>>> user needs.
>>>>
>>>> Thanks for the feedback and discussion,
>>>> Peter
>>>>
>>>> On Mon, Aug 5, 2024, 05:18 Manu Zhang <owenzhang1...@gmail.com> wrote:
>>>>
>>>>> Not familiar with Flink, I'm wondering how Flink resolves concurrency
>>>>> issues in common Flink use cases. For example, how does Flink prevent two
>>>>> jobs from writing to the same file?
>>>>>
>>>>> On the other hand, an Iceberg tag is eventually an atomic change to a
>>>>> file. It's the same as using a file lock. I don't think we can handle such
>>>>> cases without relying on external dependencies. We may define interfaces 
>>>>> in
>>>>> Iceberg and let users choose whatever services they can use to implement
>>>>> it. My $0.01.
>>>>>
>>>>> Manu
>>>>>
>>>>>
>>>>> On Mon, Aug 5, 2024 at 8:38 AM Steven Wu <stevenz...@gmail.com> wrote:
>>>>>
>>>>>> I also don't feel it is the best fit to use tags to implement locks
>>>>>> for passing control messages. This is the main sticking point for me from
>>>>>> the design doc. However, we haven't been able to come up with a better
>>>>>> solution yet. Maybe we need to go back to the drawing board again.
>>>>>>
>>>>>> I am also not sure using Kafka topics to send control messages is a
>>>>>> good fit either. It would introduce a dependency on Kafka to run Flink
>>>>>> maintenance jobs. It works for Kafka connect sink, because that is for
>>>>>> Kafka env anyway.
>>>>>>
>>>>>>
>>>>>> On Sun, Aug 4, 2024 at 1:12 PM Ryan Blue <b...@databricks.com.invalid>
>>>>>> wrote:
>>>>>>
>>>>>>>  Hi Péter, thanks for bringing this up.
>>>>>>>
>>>>>>> I don't think using a tag to "lock" a table is a good idea. The doc
>>>>>>> calls out that this is necessary "Since Flink doesn’t provide an out of 
>>>>>>> the
>>>>>>> box solution for downstream operators sending feedback to upstream
>>>>>>> operators" so this feels like using Iceberg metadata as a side-channel
>>>>>>> within a Flink application. That doesn't seem like a good idea to me.
>>>>>>>
>>>>>>> Why not use a separate Kafka topic to send control messages for this
>>>>>>> purpose, like what is done in the Kafka Connect sink? I think that's a
>>>>>>> cleaner way to solve the problem if there is not going to be a way to 
>>>>>>> fix
>>>>>>> it in Flink.
>>>>>>>
>>>>>>> Ryan
>>>>>>>
>>>>>>> On Wed, Jul 31, 2024 at 7:45 AM Péter Váry <
>>>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Team,
>>>>>>>>
>>>>>>>> During the discussion around the Flink Table Maintenance [1], [2],
>>>>>>>> I have highlighted that one of the main decision points is the way we
>>>>>>>> prevent concurrent Maintenance Tasks from happening concurrently.
>>>>>>>>
>>>>>>>> At that time we did not find better solution than providing an
>>>>>>>> interface for locking, and provide a basic implementation which is 
>>>>>>>> based on
>>>>>>>> Iceberg tags [3]:
>>>>>>>>
>>>>>>>> *"For convenience an Iceberg tag based solution is provided, so no
>>>>>>>> external dependencies are needed. On lock creation a tag named
>>>>>>>> '__flink_maitenance' is created for the current snapshot, and on lock
>>>>>>>> removal this tag is removed. This solution is not ideal, as it creates
>>>>>>>> multiple new versions of the table metadata, and mixes infrastructural
>>>>>>>> metadata with user metadata. If this is not acceptable then other
>>>>>>>> implementations could be created which could use external components 
>>>>>>>> like
>>>>>>>> JDBC/Kafka/ZooKeeper."*
>>>>>>>>
>>>>>>>>
>>>>>>>> We are in the implementation phase, and we agreed with Steven to
>>>>>>>> take one final round with the community, to see if anyone has a better
>>>>>>>> suggestion, or we could proceed with the originally agreed one.
>>>>>>>>
>>>>>>>> So if you have a better idea, please share.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Peter
>>>>>>>>
>>>>>>>> [1] -
>>>>>>>> https://lists.apache.org/thread/qjf83v2xj6lxb9sr8z0v9p0979f8wsmf
>>>>>>>> [2] -
>>>>>>>> https://lists.apache.org/thread/vjf8m5wg840o58yz4y3q35k2mfhbm49l
>>>>>>>> [3] -
>>>>>>>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit#heading=h.lt9eaimi6zyz
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Ryan Blue
>>>>>>> Databricks
>>>>>>>
>>>>>>
>>>
>>> --
>>> Ryan Blue
>>> Databricks
>>>
>>

-- 
Ryan Blue
Databricks

Reply via email to