> > We can make sure that the Tasks can tolerate concurrent runs, but as
mentioned in the doc, in most cases having concurrent runs are a waste of
resources, because of the commit conflicts.
>
> Is the problem that users may configure multiple jobs that are all trying
to run maintenance procedures? If so, isn't this a user-level problem? If
you don't want maintenance job conflicts, only run one maintenance job.

There are conflicts even when a single streaming job schedules the tasks.
One example:
- Streaming job writes rows to an append only table, and the Table
Maintenance is scheduled in the PostCommitTopology
- DataFile rewrite is scheduled on X commits - to concatenate small files
- ManifestFile rewrite is scheduled on Y commits - to decrease the number
of manifest files

DataFile rewrite will create a new manifest file. This means if a DataFile
rewrite task is finished and committed, and there is a concurrent
ManifestFile rewrite then the ManifestFile rewrite will fail. I have played
around with serializing the Maintenance Tasks (resulted in a very ugly/hard
to maintain, but working code). This serialization mitigated some of the
issues, but there was still one remaining problem. If there was a delay in
executing the rewrites (job restart/resource deprivation/longer than
expected Maintenance Task run), sometimes a consecutively scheduled
DataFile/ManifestFile rewrite is already started when the previous
DataFile/ManifestFile rewrite was still working. This again resulted in
failures.

If we accept that locking is a requirement, then we can have:
- Simpler code for the flow - easy to understand flow
- Simpler code for the tasks - no need to separate messages for the
different runs
- No failures in the logs - problematic when swallowed, making hard to
identify real issues when logged

> I don't think that an Iceberg solution is a good choice here.
Coordination in a system that does work on Iceberg tables does not need to
rely on locking in the Iceberg tables. It should have a way to coordinate
externally.
[..]
> I agree with Ryan that an Iceberg solution is not a good choice here.

I agree that we don't need to *rely* on locking in the Iceberg tables. The
`TriggerLockFactory.Lock` interface is specifically designed to allow the
user to choose the prefered type of locking. I was trying to come up with a
solution where the Apache Iceberg users don't need to rely on one more
external system for the Flink Table Maintenance to work.

I understand that this discussion is very similar to the HadoopCatalog
situation, where we have a hacky "solution" which is working in some cases,
but suboptimal. And I understand if we don't want to support it.

If nobody else has a better idea, then I will add a default JDBC based
locking implementation to the PR [1].

Thanks,
Peter

[1] https://github.com/apache/iceberg/pull/10484

Manu Zhang <owenzhang1...@gmail.com> ezt írta (időpont: 2024. aug. 6., K,
6:25):

> Hi Peter,
>
> We rely on Airflow to schedule and coordinate maintenance Spark jobs. I
> agree with Ryan that an Iceberg solution is not a good choice here.
>
> Thanks,
> Manu
>
> On Tue, Aug 6, 2024 at 1:07 AM Ryan Blue <b...@databricks.com.invalid>
> wrote:
>
>> > We can make sure that the Tasks can tolerate concurrent runs, but as
>> mentioned in the doc, in most cases having concurrent runs are a waste of
>> resources, because of the commit conflicts.
>>
>> Is the problem that users may configure multiple jobs that are all trying
>> to run maintenance procedures? If so, isn't this a user-level problem? If
>> you don't want maintenance job conflicts, only run one maintenance job.
>>
>> > If Spark/Kafka/Flink users all need some kind of locking it might worth
>> considering coming up with an Iceberg solution for it.
>>
>> I don't think that an Iceberg solution is a good choice here.
>> Coordination in a system that does work on Iceberg tables does not need to
>> rely on locking in the Iceberg tables. It should have a way to coordinate
>> externally.
>>
>> On Sun, Aug 4, 2024 at 10:29 PM Péter Váry <peter.vary.apa...@gmail.com>
>> wrote:
>>
>>> Thanks everyone for your answers! I really appreciate it, especially
>>> since these come into during the weekend, using your own time.
>>>
>>> @Manu, during our initial discussion, you have mentioned that you had
>>> similar issues with Spark compactions. You needed locking there. Is it
>>> still an issue?
>>>
>>> If Spark/Kafka/Flink users all need some kind of locking it might worth
>>> considering coming up with an Iceberg solution for it.
>>> I see the following possibilities:
>>> 1. Create a locking feature using the current Catalog API: We could
>>> create a 'lock' instead of 'tag', and we could use the Catalogs atomic
>>> change requirement to make locking atomic. My main concern with this
>>> approach is that it relies on the linear history of the table and produces
>>> more contention in write side. I was OK with this in the very specific
>>> use-case with Flink Table Maintenance (few changes, controlled times), but
>>> as a general solution, I would look somewhere else.
>>> 2. Add locking to the Catalog API as a requirement: We could widen the
>>> Catalog API with a lock/unlock optional feature. I'm not sure that we see
>>> enough use cases to merit such a big change. OTOH this could be a good
>>> additional feature to the REST Catalog as well, and maybe something we
>>> would like to add to our Catalog sooner or later.
>>>
>>> About the specific case: if we want to integrate Flink Table Maintenance
>>> to the streaming Flink sinks - and I consider that one of the main use
>>> cases - we can't rely on external schedulers to prevent concurrent writes.
>>> We can make sure that the Tasks can tolerate concurrent runs, but as
>>> mentioned in the doc, in most cases having concurrent runs are a waste of
>>> resources, because of the commit conflicts.
>>>
>>> If we decide to pursue the Flink only solution, I would implement a JDBC
>>> based locking implementation for the LockFactory interface based on the
>>> feedback. This seems like the most natural and available external storage
>>> for locks. Later we could add more implementations based on the user needs.
>>>
>>> Thanks for the feedback and discussion,
>>> Peter
>>>
>>> On Mon, Aug 5, 2024, 05:18 Manu Zhang <owenzhang1...@gmail.com> wrote:
>>>
>>>> Not familiar with Flink, I'm wondering how Flink resolves concurrency
>>>> issues in common Flink use cases. For example, how does Flink prevent two
>>>> jobs from writing to the same file?
>>>>
>>>> On the other hand, an Iceberg tag is eventually an atomic change to a
>>>> file. It's the same as using a file lock. I don't think we can handle such
>>>> cases without relying on external dependencies. We may define interfaces in
>>>> Iceberg and let users choose whatever services they can use to implement
>>>> it. My $0.01.
>>>>
>>>> Manu
>>>>
>>>>
>>>> On Mon, Aug 5, 2024 at 8:38 AM Steven Wu <stevenz...@gmail.com> wrote:
>>>>
>>>>> I also don't feel it is the best fit to use tags to implement locks
>>>>> for passing control messages. This is the main sticking point for me from
>>>>> the design doc. However, we haven't been able to come up with a better
>>>>> solution yet. Maybe we need to go back to the drawing board again.
>>>>>
>>>>> I am also not sure using Kafka topics to send control messages is a
>>>>> good fit either. It would introduce a dependency on Kafka to run Flink
>>>>> maintenance jobs. It works for Kafka connect sink, because that is for
>>>>> Kafka env anyway.
>>>>>
>>>>>
>>>>> On Sun, Aug 4, 2024 at 1:12 PM Ryan Blue <b...@databricks.com.invalid>
>>>>> wrote:
>>>>>
>>>>>>  Hi Péter, thanks for bringing this up.
>>>>>>
>>>>>> I don't think using a tag to "lock" a table is a good idea. The doc
>>>>>> calls out that this is necessary "Since Flink doesn’t provide an out of 
>>>>>> the
>>>>>> box solution for downstream operators sending feedback to upstream
>>>>>> operators" so this feels like using Iceberg metadata as a side-channel
>>>>>> within a Flink application. That doesn't seem like a good idea to me.
>>>>>>
>>>>>> Why not use a separate Kafka topic to send control messages for this
>>>>>> purpose, like what is done in the Kafka Connect sink? I think that's a
>>>>>> cleaner way to solve the problem if there is not going to be a way to fix
>>>>>> it in Flink.
>>>>>>
>>>>>> Ryan
>>>>>>
>>>>>> On Wed, Jul 31, 2024 at 7:45 AM Péter Váry <
>>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Team,
>>>>>>>
>>>>>>> During the discussion around the Flink Table Maintenance [1], [2], I
>>>>>>> have highlighted that one of the main decision points is the way we 
>>>>>>> prevent
>>>>>>> concurrent Maintenance Tasks from happening concurrently.
>>>>>>>
>>>>>>> At that time we did not find better solution than providing an
>>>>>>> interface for locking, and provide a basic implementation which is 
>>>>>>> based on
>>>>>>> Iceberg tags [3]:
>>>>>>>
>>>>>>> *"For convenience an Iceberg tag based solution is provided, so no
>>>>>>> external dependencies are needed. On lock creation a tag named
>>>>>>> '__flink_maitenance' is created for the current snapshot, and on lock
>>>>>>> removal this tag is removed. This solution is not ideal, as it creates
>>>>>>> multiple new versions of the table metadata, and mixes infrastructural
>>>>>>> metadata with user metadata. If this is not acceptable then other
>>>>>>> implementations could be created which could use external components 
>>>>>>> like
>>>>>>> JDBC/Kafka/ZooKeeper."*
>>>>>>>
>>>>>>>
>>>>>>> We are in the implementation phase, and we agreed with Steven to
>>>>>>> take one final round with the community, to see if anyone has a better
>>>>>>> suggestion, or we could proceed with the originally agreed one.
>>>>>>>
>>>>>>> So if you have a better idea, please share.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Peter
>>>>>>>
>>>>>>> [1] -
>>>>>>> https://lists.apache.org/thread/qjf83v2xj6lxb9sr8z0v9p0979f8wsmf
>>>>>>> [2] -
>>>>>>> https://lists.apache.org/thread/vjf8m5wg840o58yz4y3q35k2mfhbm49l
>>>>>>> [3] -
>>>>>>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit#heading=h.lt9eaimi6zyz
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Ryan Blue
>>>>>> Databricks
>>>>>>
>>>>>
>>
>> --
>> Ryan Blue
>> Databricks
>>
>

Reply via email to