Forwarding the invite for the discussion we plan to do with the Iceberg
folks, as some of you might be interested in this.

---------- Forwarded message ---------
From: Brian Olsen <bitsondata...@gmail.com>
Date: Mon, Apr 8, 2024, 18:29
Subject: Re: Flink table maintenance
To: <d...@iceberg.apache.org>


Hey Iceberg nation,

I would like to share about the meeting this Wednesday to further discuss
details of Péter's proposal on Flink Maintenance Tasks.
Calendar Link: https://calendar.app.google/83HGYWXoQJ8zXuVCA

List discussion:
https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl
<https://www.google.com/url?q=https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl&sa=D&source=calendar&usd=2&usg=AOvVaw2-aePIRr6APFVHpRDipMgX>

Design Doc: Flink table maintenance
<https://www.google.com/url?q=https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp%3Dsharing&sa=D&source=calendar&usd=2&usg=AOvVaw1oLYQP76-G1ZEOW5pTxV1M>



On Mon, Apr 1, 2024 at 8:52 PM Manu Zhang <owenzhang1...@gmail.com> wrote:

> Hi Peter,
>
> Are you proposing to create a user facing locking feature in Iceberg, or
>> just something something for internal use?
>>
>
> Since it's a general issue, I'm proposing to create a general user
> interface first, while the implementation can be left to users. For
> example, we use Airflow to schedule maintenance jobs and we can check
> in-progress jobs with the Airflow API. Hive metastore lock might be another
> option we can implement for users.
>
> Thanks,
> Manu
>
> On Tue, Apr 2, 2024 at 5:26 AM Péter Váry <peter.vary.apa...@gmail.com>
> wrote:
>
>> Hi Ajantha,
>>
>> I thought about enabling post commit topology based compaction for sinks
>> using options, like we use for the parametrization of streaming reads [1].
>> I think it will be hard to do it in a user friendly way - because of the
>> high number of parameters -, but I think it is a possible solution with
>> sensible defaults.
>>
>> There is a batch-like solution for data file compaction already available
>> [2], but I do not see how we could extend Flink SQL to be able to call it.
>>
>> Writing to a branch using Flink SQL should be another thread, but by my
>> first guess, it shouldn't be hard to implement using options, like:
>> /*+ OPTIONS('branch'='b1') */
>> Since writing to branch i already working through the Java API [3].
>>
>> Thanks, Peter
>>
>> 1 -
>> https://iceberg.apache.org/docs/latest/flink-queries/#flink-streaming-read
>> 2 -
>> https://github.com/apache/iceberg/blob/820fc3ceda386149f42db8b54e6db9171d1a3a6d/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
>> 3 -
>> https://iceberg.apache.org/docs/latest/flink-writes/#branch-writes
>>
>> On Mon, Apr 1, 2024, 16:30 Ajantha Bhat <ajanthab...@gmail.com> wrote:
>>
>>> Thanks for the proposal Peter.
>>>
>>> I just wanted to know do we have any plans for supporting SQL syntax for
>>> table maintenance (like CALL procedure) for pure Flink SQL users?
>>> I didn't see any custom SQL parser plugin support in Flink. I also saw
>>> that Branch write doesn't have SQL support (only Branch reads use Option),
>>> So I am not sure about the roadmap of Iceberg SQL support in Flink.
>>> Was there any discussion before?
>>>
>>> - Ajantha
>>>
>>> On Mon, Apr 1, 2024 at 7:51 PM Péter Váry <peter.vary.apa...@gmail.com>
>>> wrote:
>>>
>>>> Hi Manu,
>>>>
>>>> Just to clarify:
>>>> - Are you proposing to create a user facing locking feature in Iceberg,
>>>> or just something something for internal use?
>>>>
>>>> I think we shouldn't add locking to Iceberg's user facing scope in this
>>>> stage. A fully featured locking system has many more features that we need
>>>> (priorities, fairness, timeouts etc). I could be tempted when we are
>>>> talking about the REST catalog, but I think that should be further down the
>>>> road, if ever...
>>>>
>>>> About using the tags:
>>>> - I whole-heartedly agree that using tags is not intuitive, and I see
>>>> your points in most of your arguments. OTOH, introducing new requirement
>>>> (locking mechanism) seems like a wrong direction to me.
>>>> - We already defined a requirement (atomic changes on the table) for
>>>> the Catalog implementations which could be used to archive our goal here.
>>>> - We also already store technical data in snapshot properties in Flink
>>>> jobs (JobId, OperatorId, CheckpointId). Maybe technical tags/table
>>>> properties is not a big stretch.
>>>>
>>>> Or we can look at these tags or metadata as 'technical data' which is
>>>> internal to Iceberg, and shouldn't expressed on the external API. My
>>>> concern is:
>>>> - Would it be used often enough to worth the additional complexity?
>>>>
>>>> Knowing that Spark compaction is struggling with the same issue is a
>>>> good indicator, but probably we would need more use cases for introducing a
>>>> new feature with this complexity, or simpler solution.
>>>>
>>>> Thanks, Peter
>>>>
>>>>
>>>> On Mon, Apr 1, 2024, 10:18 Manu Zhang <owenzhang1...@gmail.com> wrote:
>>>>
>>>>> What would the community think of exploiting tags for preventing
>>>>>> concurrent maintenance loop executions.
>>>>>
>>>>>
>>>>> This issue is not specific to Flink maintenance jobs. We have a
>>>>> service scheduling Spark maintenance jobs by watching table commits. When
>>>>> we don't check in-progress maintenance jobs for the same table, multiple
>>>>> jobs will run concurrently and have conflicts.
>>>>>
>>>>> Basically, I think we need a lock mechanism like the metastore lock
>>>>> <https://iceberg.apache.org/docs/nightly/configuration/#hadoop-configuration>
>>>>> if we want to handle it for users. However, using TAG doesn't look
>>>>> intuitive to me. We are also mixing user data with system metadata.
>>>>> Maybe we can define some general interfaces and leave the
>>>>> implementation to users in the first version.
>>>>>
>>>>> Regards,
>>>>> Manu
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Mar 29, 2024 at 1:59 PM Péter Váry <
>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>
>>>>>> What would the community think of exploiting tags for preventing
>>>>>> concurrent maintenance loop executions.
>>>>>>
>>>>>> The issue:
>>>>>> Some maintenance tasks couldn't run parallel, like DeleteOrphanFiles
>>>>>> vs. ExpireSnapshots, or RewriteDataFiles vs. RewriteManifestFiles. We 
>>>>>> make
>>>>>> sure, not to run tasks started by a single trigger concurrently by
>>>>>> serializing them, but there are no loops in Flink, so we can't 
>>>>>> synchronize
>>>>>> tasks started by the next trigger.
>>>>>>
>>>>>> In the document, I describe that we need to rely on the user to
>>>>>> ensure that the rate limit is high enough to prevent concurrent triggers.
>>>>>>
>>>>>> Proposal:
>>>>>> When firing a trigger, RateLimiter could check and create an Iceberg
>>>>>> table tag [1] for the current table snapshot, with the name:
>>>>>> '__flink_maitenance'. When the execution finishes we remove this tag. The
>>>>>> RateLimiter keep accumulating changes, and doesn't fire new triggers 
>>>>>> until
>>>>>> it finds this tag on the table.
>>>>>> The solution relies on Flink 'forceNonParallel' to prevent concurrent
>>>>>> execution of placing the tag, and on Iceberg to store it.
>>>>>>
>>>>>> This not uses the tags as intended, but seems like a better solution
>>>>>> than adding/removing table properties which would clutter the table 
>>>>>> history
>>>>>> with technical data.
>>>>>>
>>>>>> Your thoughts? Any other suggestions/solutions would be welcome.
>>>>>>
>>>>>> Thanks,
>>>>>> Peter
>>>>>>
>>>>>> [1]
>>>>>> https://iceberg.apache.org/docs/latest/java-api-quickstart/#branching-and-tagging
>>>>>>
>>>>>> On Thu, Mar 28, 2024, 14:44 Péter Váry <peter.vary.apa...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Team,
>>>>>>>
>>>>>>> As discussed on yesterday's community sync, I am working on adding a
>>>>>>> possibility to the Flink Iceberg connector to run maintenance tasks on 
>>>>>>> the
>>>>>>> Iceberg tables. This will fix the small files issues and in the long run
>>>>>>> help compacting the high number of positional and equality deletes 
>>>>>>> created
>>>>>>> by Flink tasks writing CDC data to Iceberg tables without the need of 
>>>>>>> Spark
>>>>>>> in the infrastructure.
>>>>>>>
>>>>>>> I did some planning, prototyping and currently trying out the
>>>>>>> solution on a larger scale.
>>>>>>>
>>>>>>> I put together a document how my current solution looks like:
>>>>>>>
>>>>>>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp=sharing
>>>>>>>
>>>>>>> I would love to hear your thoughts and feedback on this to find a
>>>>>>> good final solution.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Peter
>>>>>>>
>>>>>>

Reply via email to