Thanks everyone for participating in the discussion! This was
really helpful!

I have updated the document, and hopefully incorporated all of the
comments, suggestions, and finalized the framework part of the document.
Left the part describing the specific Maintenance Tasks in draft (marked
them in the document). Please review:
https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA

Here is how I plan to move forward - mirroring the Iceberg change / Flink
FLIP process:

   1. Start a vote on the framework with the closing time of new Wednesday
   (8th May) -
   https://lists.apache.org/thread/qhz17ppdbb57ql356j49qqk3nyk59rvm
   2. If the vote is successful, we would start the implementation process:
      - Rod is already working on the SinkV2 implementation (
      https://github.com/apache/iceberg/pull/10179)
      - Monitor Source
      - Trigger Manager
      - Commit Converter
   3. In parallel we can start finalizing the specific Maintenance Tasks in
   the following order:
      1. Expire Snapshots - this is the simplest of the Maintenance Tasks,
      so we probably would like to start with it
      2. Rewrite Data Files - we have an existing implementation which has
      some parts we can reuse
      3. Rewrite Manifest Files - probably this is the most complex of the
      tasks
      4. Delete Orphan Files - I think this one has the least immediate
      value

Please share your thoughts, and vote on the thread:
https://lists.apache.org/thread/qhz17ppdbb57ql356j49qqk3nyk59rvm

Thanks,
Peter

Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2024. ápr. 22.,
H, 16:06):

> Hi Nathan Ma,
>
> Thanks for joining the discussion!
>
> 1-2. In the other thread (
> https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl) we
> discussed the resource consumption/buffers/checkpointing in detail. We
> agreed that even if we try to do everything to separate the stream
> processing from the maintenance tasks there will be conflicts. So we
> decided to prioritize the Monitor based solution as it is more general, and
> better at separating resource usage from the main writer job. The
> PostCommitTopology based integration has its own uses, but might have a
> lower priority.
>
> 3. For equality deletes we have the following options:
>
>    - DataFileCompaction takes care of the equality deletes, as the old
>    equality delete files are not applied to the rewritten files
>       - Cost: O(n) read + O(n) write - n the size of the table
>       - Good: No new maintenance task is needed
>       - Bad: The whole table need to be rewritten
>    - There were discussions about rewriting equality delete files to
>    positional delete files. I am not sure this would worth it, as we still
>    need to read every old data file
>       - Cost: O(n) read + O(m) write - n the size of the table, m the
>       number of the deleted rows
>       - Good: Lower cost
>       - Bad: The whole trable need to be read, and the positional delete
>       files still need to be used later
>
> If you have time you can run some tests about the cost vs. gains with the
> equality delete to positional delete conversion. Keep in mind that we
> probably would like to have DeleteGranularity.FILE for the positional
> delete files, so the delete files are not conflicting with the compaction
> commits (See: https://github.com/apache/iceberg/pull/10200)
>
> Thanks,
> Peter
>
> nathan ma <majin1...@gmail.com> ezt írta (időpont: 2024. ápr. 22., H,
> 14:12):
>
>> Hi guys, thanks for raising this wonderful thread, and thanks to @
>> peter.vary.apa...@gmail.com,  the doc really makes things clear
>>
>> I have several questions to catch up and I did't find answers in the
>> document. I would appreciate that anyone could give some feedbacks cause
>> the issues really important for flink production environments.
>>
>> 1. Will the commit topology way block stream processing in cases that
>> maintenance tasks especially compaction run too slowly or resources(subtask
>> thread count) are not sufficient? This is quite a flink issue and I wonder
>> this could be fixed in commit topology. I think maybe sync and buffer
>> events may somehow resolve this issue, which I think could be seriously
>> discussed for production use cases.
>> 2. How to deal with CURD operations while flink stream processing works.
>> For example, I could run a spark 'UPDATE' SQL while flink ingestion job
>> running on the same table. Will the maintenance tasks include this CURD
>> optimization? or just process events from upstream operators? From my view,
>> a single monitor job seems available for all cases and commit topology way
>> assumes only this flink job would write data on the table.
>> 3. In our secenarios, we found that equality-delete files would heavily
>> impact read performance while the impact of positional-delete files (small
>> enougth) could be negligible. Shall we make some designs towards this
>> phenomenon?
>>
>> I am new to this thread and our team had some experiance for flink on
>> iceberg improvements. It would be my honor if I can join this thread and
>> make my contributions.
>>
>> Is there a single flink dev mail list for Iceberg? or all in dev mails.
>> How and when to set a discussion?
>>
>> Péter Váry <peter.vary.apa...@gmail.com> 于2024年4月22日周一 18:22写道:
>>
>>> Hi Team,
>>>
>>> The discussion continued on the previous thread by Gen Luo and Zhu Zhu.
>>> See: https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl.
>>> Adding them to this thread too, so we can continue the discussion in one
>>> place. Based on their thoughts there, the current suggestion is:
>>>
>>> Q: Do you have concerns with the Maintenance Tasks execution
>>> infrastructure?
>>> A:
>>> Topology:
>>>
>>>    - We should prioritize the Monitor based solution as it is more
>>>    general, and better at separating resource usage from the main writer 
>>> jobs
>>>    - PostCommitTopology based integration has its own uses, but might
>>>    have a lower priority
>>>
>>> Checkpointing:
>>>
>>>    - We should consider AsyncIO as much as possible to prevent blocking
>>>    of checkpoint creation
>>>    - Because of the current limitations of AsyncIO, unaligned
>>>    checkpoints could be useful when the task size grows
>>>
>>>
>>> Q: Do you have use-cases for other Maintenance Task(s) than mentioned in
>>> the doc?
>>> A: Data file rewrite (Global/Incremental), Expire snapshots, Manifest
>>> files rewrite, Delete orphan files - no new tasks come up
>>>
>>> Q: Do you think more scheduling information is needed than mentioned in
>>> the doc?
>>> A: Commit number, New data file number, New data file size, New delete
>>> file number, Elapsed time since the last run - no new scheduling
>>> information come up
>>>
>>> Q: How to prevent concurrent Maintenance Tasks?
>>> A: Discussed different possible solutions, but the best bet is still the
>>> tag based locking
>>>
>>> I have yet to hear back from most of you on Flink table maintenance. If
>>> you have time, please share your thoughts.
>>> If there are no more concerns, I would love to close this part of the
>>> design and move forward with the implementation.
>>>
>>> Thanks,
>>> Peter
>>>
>>>
>>>
>>> Brian Olsen <bitsondata...@gmail.com> ezt írta (időpont: 2024. ápr.
>>> 12., P, 7:58):
>>>
>>>> Hey everyone,
>>>>
>>>> Following up from this meeting, we decided due to the distributed
>>>> nature of everyone involved on Flink, it would be best to achieve consensus
>>>> on some critical points that Péter outlined for us.
>>>>
>>>>
>>>>    - Do you have concerns with the Maintenance Tasks execution
>>>>    infrastructure? The current plan is:
>>>>
>>>>
>>>>    - Streaming tasks to continuously execute the Maintenance Tasks
>>>>    - SinkV2 PostCommitTopology or Monitor service to collect the
>>>>    changes
>>>>    - Scheduler to decide which Maintenance Task(s) to run
>>>>    - Serialized Maintenance Task execution
>>>>
>>>>
>>>>    - Do you have use-cases for other Maintenance Task(s) than
>>>>    mentioned in the doc? Do you have different priorities?
>>>>
>>>>
>>>>    - Data file rewrite
>>>>
>>>>
>>>>    - Global for rewriting existing and new files - equality/positional
>>>>    delete removal, repartitioning etc
>>>>    - Incremental for rewriting only new files - no metadata read needed
>>>>
>>>>
>>>>    - Expire snapshots
>>>>    - Manifest files rewrite
>>>>    - Delete orphan files
>>>>    - Positional delete files - probably not needed for Flink
>>>>
>>>>
>>>>    - Do you think more scheduling information is need than mentioned
>>>>    in the doc?
>>>>
>>>>
>>>>    - Commit number
>>>>    - New data file number
>>>>    - New data file size
>>>>    - New delete file number
>>>>    - Elapsed time since the last run
>>>>
>>>>
>>>>    - How to prevent concurrent Maintenance Tasks?
>>>>
>>>>
>>>>    - Just run, and retry - will clutter the logs with failures, and
>>>>    waste resources
>>>>    - External locking
>>>>    - Using Iceberg table tags to prevent concurrent runs - mixing user
>>>>    data with technical data
>>>>    - We need a better solution [image: :smile:]
>>>>
>>>>
>>>>
>>>> I will be following up with these members if I don't see them reply in
>>>> the following week. If you have no comment and you are on this list, please
>>>> reply 'looks good' to acknowledge you have seen it. Anyone is free to chime
>>>> in, we just wanted a list of required eyes before assuming consensus.
>>>>
>>>> Péter
>>>> Ryan Blue
>>>> Daniel C. Weeks
>>>> Russell Spitzer
>>>> Steven Wu
>>>> Bryan Keller
>>>> Manu Zhang
>>>> Ajantha Bhat
>>>> Yanghao Lin
>>>> Thanks everyone!!
>>>>
>>>> On Thu, Apr 11, 2024 at 7:13 AM wenjin <wenjin...@gmail.com> wrote:
>>>>
>>>>> Hi Peter,
>>>>>
>>>>> I am interested in your proposal and have clarified some confusions
>>>>> with your help in Flink community, thanks for your answers.
>>>>>
>>>>> I participated in yesterday’s discussion, but since I am not a native
>>>>> English speaker, I am concerned that I may have missed some details
>>>>> regarding the follow actions about this proposal. So, if possible, please
>>>>> involve me when there are further discuss or updates.
>>>>>
>>>>> Thanks,
>>>>> wenjin
>>>>>
>>>>> On 2024/04/09 04:48:48 Péter Váry wrote:
>>>>> > Forwarding the invite for the discussion we plan to do with the
>>>>> Iceberg
>>>>> > folks, as some of you might be interested in this.
>>>>> >
>>>>> > ---------- Forwarded message ---------
>>>>> > From: Brian Olsen <bi...@gmail.com>
>>>>> > Date: Mon, Apr 8, 2024, 18:29
>>>>> > Subject: Re: Flink table maintenance
>>>>> > To: <de...@iceberg.apache.org>
>>>>> >
>>>>> >
>>>>> > Hey Iceberg nation,
>>>>> >
>>>>> > I would like to share about the meeting this Wednesday to further
>>>>> discuss
>>>>> > details of Péter's proposal on Flink Maintenance Tasks.
>>>>> > Calendar Link: https://calendar.app.google/83HGYWXoQJ8zXuVCA
>>>>> >
>>>>> > List discussion:
>>>>> > https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl
>>>>> > <
>>>>> https://www.google.com/url?q=https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl&sa=D&source=calendar&usd=2&usg=AOvVaw2-aePIRr6APFVHpRDipMgX
>>>>> >
>>>>> >
>>>>> > Design Doc: Flink table maintenance
>>>>> > <
>>>>> https://www.google.com/url?q=https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp%3Dsharing&sa=D&source=calendar&usd=2&usg=AOvVaw1oLYQP76-G1ZEOW5pTxV1M
>>>>> >
>>>>> >
>>>>> >
>>>>> >
>>>>> > On Mon, Apr 1, 2024 at 8:52 PM Manu Zhang <ow...@gmail.com> wrote:
>>>>> >
>>>>> > > Hi Peter,
>>>>> > >
>>>>> > > Are you proposing to create a user facing locking feature in
>>>>> Iceberg, or
>>>>> > >> just something something for internal use?
>>>>> > >>
>>>>> > >
>>>>> > > Since it's a general issue, I'm proposing to create a general user
>>>>> > > interface first, while the implementation can be left to users. For
>>>>> > > example, we use Airflow to schedule maintenance jobs and we can
>>>>> check
>>>>> > > in-progress jobs with the Airflow API. Hive metastore lock might
>>>>> be another
>>>>> > > option we can implement for users.
>>>>> > >
>>>>> > > Thanks,
>>>>> > > Manu
>>>>> > >
>>>>> > > On Tue, Apr 2, 2024 at 5:26 AM Péter Váry <pe...@gmail.com>
>>>>> > > wrote:
>>>>> > >
>>>>> > >> Hi Ajantha,
>>>>> > >>
>>>>> > >> I thought about enabling post commit topology based compaction
>>>>> for sinks
>>>>> > >> using options, like we use for the parametrization of streaming
>>>>> reads [1].
>>>>> > >> I think it will be hard to do it in a user friendly way - because
>>>>> of the
>>>>> > >> high number of parameters -, but I think it is a possible
>>>>> solution with
>>>>> > >> sensible defaults.
>>>>> > >>
>>>>> > >> There is a batch-like solution for data file compaction already
>>>>> available
>>>>> > >> [2], but I do not see how we could extend Flink SQL to be able to
>>>>> call it.
>>>>> > >>
>>>>> > >> Writing to a branch using Flink SQL should be another thread, but
>>>>> by my
>>>>> > >> first guess, it shouldn't be hard to implement using options,
>>>>> like:
>>>>> > >> /*+ OPTIONS('branch'='b1') */
>>>>> > >> Since writing to branch i already working through the Java API
>>>>> [3].
>>>>> > >>
>>>>> > >> Thanks, Peter
>>>>> > >>
>>>>> > >> 1 -
>>>>> > >>
>>>>> https://iceberg.apache.org/docs/latest/flink-queries/#flink-streaming-read
>>>>> > >> 2 -
>>>>> > >>
>>>>> https://github.com/apache/iceberg/blob/820fc3ceda386149f42db8b54e6db9171d1a3a6d/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
>>>>> > >> 3 -
>>>>> > >>
>>>>> https://iceberg.apache.org/docs/latest/flink-writes/#branch-writes
>>>>> > >>
>>>>> > >> On Mon, Apr 1, 2024, 16:30 Ajantha Bhat <aj...@gmail.com> wrote:
>>>>> > >>
>>>>> > >>> Thanks for the proposal Peter.
>>>>> > >>>
>>>>> > >>> I just wanted to know do we have any plans for supporting SQL
>>>>> syntax for
>>>>> > >>> table maintenance (like CALL procedure) for pure Flink SQL users?
>>>>> > >>> I didn't see any custom SQL parser plugin support in Flink. I
>>>>> also saw
>>>>> > >>> that Branch write doesn't have SQL support (only Branch reads
>>>>> use Option),
>>>>> > >>> So I am not sure about the roadmap of Iceberg SQL support in
>>>>> Flink.
>>>>> > >>> Was there any discussion before?
>>>>> > >>>
>>>>> > >>> - Ajantha
>>>>> > >>>
>>>>> > >>> On Mon, Apr 1, 2024 at 7:51 PM Péter Váry <pe...@gmail.com>
>>>>> > >>> wrote:
>>>>> > >>>
>>>>> > >>>> Hi Manu,
>>>>> > >>>>
>>>>> > >>>> Just to clarify:
>>>>> > >>>> - Are you proposing to create a user facing locking feature in
>>>>> Iceberg,
>>>>> > >>>> or just something something for internal use?
>>>>> > >>>>
>>>>> > >>>> I think we shouldn't add locking to Iceberg's user facing scope
>>>>> in this
>>>>> > >>>> stage. A fully featured locking system has many more features
>>>>> that we need
>>>>> > >>>> (priorities, fairness, timeouts etc). I could be tempted when
>>>>> we are
>>>>> > >>>> talking about the REST catalog, but I think that should be
>>>>> further down the
>>>>> > >>>> road, if ever...
>>>>> > >>>>
>>>>> > >>>> About using the tags:
>>>>> > >>>> - I whole-heartedly agree that using tags is not intuitive, and
>>>>> I see
>>>>> > >>>> your points in most of your arguments. OTOH, introducing new
>>>>> requirement
>>>>> > >>>> (locking mechanism) seems like a wrong direction to me.
>>>>> > >>>> - We already defined a requirement (atomic changes on the
>>>>> table) for
>>>>> > >>>> the Catalog implementations which could be used to archive our
>>>>> goal here.
>>>>> > >>>> - We also already store technical data in snapshot properties
>>>>> in Flink
>>>>> > >>>> jobs (JobId, OperatorId, CheckpointId). Maybe technical
>>>>> tags/table
>>>>> > >>>> properties is not a big stretch.
>>>>> > >>>>
>>>>> > >>>> Or we can look at these tags or metadata as 'technical data'
>>>>> which is
>>>>> > >>>> internal to Iceberg, and shouldn't expressed on the external
>>>>> API. My
>>>>> > >>>> concern is:
>>>>> > >>>> - Would it be used often enough to worth the additional
>>>>> complexity?
>>>>> > >>>>
>>>>> > >>>> Knowing that Spark compaction is struggling with the same issue
>>>>> is a
>>>>> > >>>> good indicator, but probably we would need more use cases for
>>>>> introducing a
>>>>> > >>>> new feature with this complexity, or simpler solution.
>>>>> > >>>>
>>>>> > >>>> Thanks, Peter
>>>>> > >>>>
>>>>> > >>>>
>>>>> > >>>> On Mon, Apr 1, 2024, 10:18 Manu Zhang <ow...@gmail.com> wrote:
>>>>> > >>>>
>>>>> > >>>>> What would the community think of exploiting tags for
>>>>> preventing
>>>>> > >>>>>> concurrent maintenance loop executions.
>>>>> > >>>>>
>>>>> > >>>>>
>>>>> > >>>>> This issue is not specific to Flink maintenance jobs. We have a
>>>>> > >>>>> service scheduling Spark maintenance jobs by watching table
>>>>> commits. When
>>>>> > >>>>> we don't check in-progress maintenance jobs for the same
>>>>> table, multiple
>>>>> > >>>>> jobs will run concurrently and have conflicts.
>>>>> > >>>>>
>>>>> > >>>>> Basically, I think we need a lock mechanism like the metastore
>>>>> lock
>>>>> > >>>>> <
>>>>> https://iceberg.apache.org/docs/nightly/configuration/#hadoop-configuration
>>>>> >
>>>>> > >>>>> if we want to handle it for users. However, using TAG doesn't
>>>>> look
>>>>> > >>>>> intuitive to me. We are also mixing user data with system
>>>>> metadata.
>>>>> > >>>>> Maybe we can define some general interfaces and leave the
>>>>> > >>>>> implementation to users in the first version.
>>>>> > >>>>>
>>>>> > >>>>> Regards,
>>>>> > >>>>> Manu
>>>>> > >>>>>
>>>>> > >>>>>
>>>>> > >>>>>
>>>>> > >>>>> On Fri, Mar 29, 2024 at 1:59 PM Péter Váry <
>>>>> > >>>>> peter.vary.apa...@gmail.com> wrote:
>>>>> > >>>>>
>>>>> > >>>>>> What would the community think of exploiting tags for
>>>>> preventing
>>>>> > >>>>>> concurrent maintenance loop executions.
>>>>> > >>>>>>
>>>>> > >>>>>> The issue:
>>>>> > >>>>>> Some maintenance tasks couldn't run parallel, like
>>>>> DeleteOrphanFiles
>>>>> > >>>>>> vs. ExpireSnapshots, or RewriteDataFiles vs.
>>>>> RewriteManifestFiles. We make
>>>>> > >>>>>> sure, not to run tasks started by a single trigger
>>>>> concurrently by
>>>>> > >>>>>> serializing them, but there are no loops in Flink, so we
>>>>> can't synchronize
>>>>> > >>>>>> tasks started by the next trigger.
>>>>> > >>>>>>
>>>>> > >>>>>> In the document, I describe that we need to rely on the user
>>>>> to
>>>>> > >>>>>> ensure that the rate limit is high enough to prevent
>>>>> concurrent triggers.
>>>>> > >>>>>>
>>>>> > >>>>>> Proposal:
>>>>> > >>>>>> When firing a trigger, RateLimiter could check and create an
>>>>> Iceberg
>>>>> > >>>>>> table tag [1] for the current table snapshot, with the name:
>>>>> > >>>>>> '__flink_maitenance'. When the execution finishes we remove
>>>>> this tag. The
>>>>> > >>>>>> RateLimiter keep accumulating changes, and doesn't fire new
>>>>> triggers until
>>>>> > >>>>>> it finds this tag on the table.
>>>>> > >>>>>> The solution relies on Flink 'forceNonParallel' to prevent
>>>>> concurrent
>>>>> > >>>>>> execution of placing the tag, and on Iceberg to store it.
>>>>> > >>>>>>
>>>>> > >>>>>> This not uses the tags as intended, but seems like a better
>>>>> solution
>>>>> > >>>>>> than adding/removing table properties which would clutter the
>>>>> table history
>>>>> > >>>>>> with technical data.
>>>>> > >>>>>>
>>>>> > >>>>>> Your thoughts? Any other suggestions/solutions would be
>>>>> welcome.
>>>>> > >>>>>>
>>>>> > >>>>>> Thanks,
>>>>> > >>>>>> Peter
>>>>> > >>>>>>
>>>>> > >>>>>> [1]
>>>>> > >>>>>>
>>>>> https://iceberg.apache.org/docs/latest/java-api-quickstart/#branching-and-tagging
>>>>> > >>>>>>
>>>>> > >>>>>> On Thu, Mar 28, 2024, 14:44 Péter Váry <pe...@gmail.com>
>>>>> > >>>>>> wrote:
>>>>> > >>>>>>
>>>>> > >>>>>>> Hi Team,
>>>>> > >>>>>>>
>>>>> > >>>>>>> As discussed on yesterday's community sync, I am working on
>>>>> adding a
>>>>> > >>>>>>> possibility to the Flink Iceberg connector to run
>>>>> maintenance tasks on the
>>>>> > >>>>>>> Iceberg tables. This will fix the small files issues and in
>>>>> the long run
>>>>> > >>>>>>> help compacting the high number of positional and equality
>>>>> deletes created
>>>>> > >>>>>>> by Flink tasks writing CDC data to Iceberg tables without
>>>>> the need of Spark
>>>>> > >>>>>>> in the infrastructure.
>>>>> > >>>>>>>
>>>>> > >>>>>>> I did some planning, prototyping and currently trying out the
>>>>> > >>>>>>> solution on a larger scale.
>>>>> > >>>>>>>
>>>>> > >>>>>>> I put together a document how my current solution looks like:
>>>>> > >>>>>>>
>>>>> > >>>>>>>
>>>>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp=sharing
>>>>> > >>>>>>>
>>>>> > >>>>>>> I would love to hear your thoughts and feedback on this to
>>>>> find a
>>>>> > >>>>>>> good final solution.
>>>>> > >>>>>>>
>>>>> > >>>>>>> Thanks,
>>>>> > >>>>>>> Peter
>>>>> > >>>>>>>
>>>>> > >>>>>>
>>>>> >
>>>>
>>>>

Reply via email to