Hi Team,

The discussion continued on the previous thread by Gen Luo and Zhu Zhu.
See: https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl.
Adding them to this thread too, so we can continue the discussion in one
place. Based on their thoughts there, the current suggestion is:

Q: Do you have concerns with the Maintenance Tasks execution infrastructure?
A:
Topology:

   - We should prioritize the Monitor based solution as it is more general,
   and better at separating resource usage from the main writer jobs
   - PostCommitTopology based integration has its own uses, but might have
   a lower priority

Checkpointing:

   - We should consider AsyncIO as much as possible to prevent blocking of
   checkpoint creation
   - Because of the current limitations of AsyncIO, unaligned checkpoints
   could be useful when the task size grows


Q: Do you have use-cases for other Maintenance Task(s) than mentioned in
the doc?
A: Data file rewrite (Global/Incremental), Expire snapshots, Manifest files
rewrite, Delete orphan files - no new tasks come up

Q: Do you think more scheduling information is needed than mentioned in the
doc?
A: Commit number, New data file number, New data file size, New delete file
number, Elapsed time since the last run - no new scheduling information
come up

Q: How to prevent concurrent Maintenance Tasks?
A: Discussed different possible solutions, but the best bet is still the
tag based locking

I have yet to hear back from most of you on Flink table maintenance. If you
have time, please share your thoughts.
If there are no more concerns, I would love to close this part of the
design and move forward with the implementation.

Thanks,
Peter



Brian Olsen <bitsondata...@gmail.com> ezt írta (időpont: 2024. ápr. 12., P,
7:58):

> Hey everyone,
>
> Following up from this meeting, we decided due to the distributed nature
> of everyone involved on Flink, it would be best to achieve consensus on
> some critical points that Péter outlined for us.
>
>
>    - Do you have concerns with the Maintenance Tasks execution
>    infrastructure? The current plan is:
>
>
>    - Streaming tasks to continuously execute the Maintenance Tasks
>    - SinkV2 PostCommitTopology or Monitor service to collect the changes
>    - Scheduler to decide which Maintenance Task(s) to run
>    - Serialized Maintenance Task execution
>
>
>    - Do you have use-cases for other Maintenance Task(s) than mentioned
>    in the doc? Do you have different priorities?
>
>
>    - Data file rewrite
>
>
>    - Global for rewriting existing and new files - equality/positional
>    delete removal, repartitioning etc
>    - Incremental for rewriting only new files - no metadata read needed
>
>
>    - Expire snapshots
>    - Manifest files rewrite
>    - Delete orphan files
>    - Positional delete files - probably not needed for Flink
>
>
>    - Do you think more scheduling information is need than mentioned in
>    the doc?
>
>
>    - Commit number
>    - New data file number
>    - New data file size
>    - New delete file number
>    - Elapsed time since the last run
>
>
>    - How to prevent concurrent Maintenance Tasks?
>
>
>    - Just run, and retry - will clutter the logs with failures, and waste
>    resources
>    - External locking
>    - Using Iceberg table tags to prevent concurrent runs - mixing user
>    data with technical data
>    - We need a better solution [image: :smile:]
>
>
>
> I will be following up with these members if I don't see them reply in the
> following week. If you have no comment and you are on this list, please
> reply 'looks good' to acknowledge you have seen it. Anyone is free to chime
> in, we just wanted a list of required eyes before assuming consensus.
>
> Péter
> Ryan Blue
> Daniel C. Weeks
> Russell Spitzer
> Steven Wu
> Bryan Keller
> Manu Zhang
> Ajantha Bhat
> Yanghao Lin
> Thanks everyone!!
>
> On Thu, Apr 11, 2024 at 7:13 AM wenjin <wenjin...@gmail.com> wrote:
>
>> Hi Peter,
>>
>> I am interested in your proposal and have clarified some confusions with
>> your help in Flink community, thanks for your answers.
>>
>> I participated in yesterday’s discussion, but since I am not a native
>> English speaker, I am concerned that I may have missed some details
>> regarding the follow actions about this proposal. So, if possible, please
>> involve me when there are further discuss or updates.
>>
>> Thanks,
>> wenjin
>>
>> On 2024/04/09 04:48:48 Péter Váry wrote:
>> > Forwarding the invite for the discussion we plan to do with the Iceberg
>> > folks, as some of you might be interested in this.
>> >
>> > ---------- Forwarded message ---------
>> > From: Brian Olsen <bi...@gmail.com>
>> > Date: Mon, Apr 8, 2024, 18:29
>> > Subject: Re: Flink table maintenance
>> > To: <de...@iceberg.apache.org>
>> >
>> >
>> > Hey Iceberg nation,
>> >
>> > I would like to share about the meeting this Wednesday to further
>> discuss
>> > details of Péter's proposal on Flink Maintenance Tasks.
>> > Calendar Link: https://calendar.app.google/83HGYWXoQJ8zXuVCA
>> >
>> > List discussion:
>> > https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl
>> > <
>> https://www.google.com/url?q=https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl&sa=D&source=calendar&usd=2&usg=AOvVaw2-aePIRr6APFVHpRDipMgX
>> >
>> >
>> > Design Doc: Flink table maintenance
>> > <
>> https://www.google.com/url?q=https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp%3Dsharing&sa=D&source=calendar&usd=2&usg=AOvVaw1oLYQP76-G1ZEOW5pTxV1M
>> >
>> >
>> >
>> >
>> > On Mon, Apr 1, 2024 at 8:52 PM Manu Zhang <ow...@gmail.com> wrote:
>> >
>> > > Hi Peter,
>> > >
>> > > Are you proposing to create a user facing locking feature in Iceberg,
>> or
>> > >> just something something for internal use?
>> > >>
>> > >
>> > > Since it's a general issue, I'm proposing to create a general user
>> > > interface first, while the implementation can be left to users. For
>> > > example, we use Airflow to schedule maintenance jobs and we can check
>> > > in-progress jobs with the Airflow API. Hive metastore lock might be
>> another
>> > > option we can implement for users.
>> > >
>> > > Thanks,
>> > > Manu
>> > >
>> > > On Tue, Apr 2, 2024 at 5:26 AM Péter Váry <pe...@gmail.com>
>> > > wrote:
>> > >
>> > >> Hi Ajantha,
>> > >>
>> > >> I thought about enabling post commit topology based compaction for
>> sinks
>> > >> using options, like we use for the parametrization of streaming
>> reads [1].
>> > >> I think it will be hard to do it in a user friendly way - because of
>> the
>> > >> high number of parameters -, but I think it is a possible solution
>> with
>> > >> sensible defaults.
>> > >>
>> > >> There is a batch-like solution for data file compaction already
>> available
>> > >> [2], but I do not see how we could extend Flink SQL to be able to
>> call it.
>> > >>
>> > >> Writing to a branch using Flink SQL should be another thread, but by
>> my
>> > >> first guess, it shouldn't be hard to implement using options, like:
>> > >> /*+ OPTIONS('branch'='b1') */
>> > >> Since writing to branch i already working through the Java API [3].
>> > >>
>> > >> Thanks, Peter
>> > >>
>> > >> 1 -
>> > >>
>> https://iceberg.apache.org/docs/latest/flink-queries/#flink-streaming-read
>> > >> 2 -
>> > >>
>> https://github.com/apache/iceberg/blob/820fc3ceda386149f42db8b54e6db9171d1a3a6d/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
>> > >> 3 -
>> > >> https://iceberg.apache.org/docs/latest/flink-writes/#branch-writes
>> > >>
>> > >> On Mon, Apr 1, 2024, 16:30 Ajantha Bhat <aj...@gmail.com> wrote:
>> > >>
>> > >>> Thanks for the proposal Peter.
>> > >>>
>> > >>> I just wanted to know do we have any plans for supporting SQL
>> syntax for
>> > >>> table maintenance (like CALL procedure) for pure Flink SQL users?
>> > >>> I didn't see any custom SQL parser plugin support in Flink. I also
>> saw
>> > >>> that Branch write doesn't have SQL support (only Branch reads use
>> Option),
>> > >>> So I am not sure about the roadmap of Iceberg SQL support in Flink.
>> > >>> Was there any discussion before?
>> > >>>
>> > >>> - Ajantha
>> > >>>
>> > >>> On Mon, Apr 1, 2024 at 7:51 PM Péter Váry <pe...@gmail.com>
>> > >>> wrote:
>> > >>>
>> > >>>> Hi Manu,
>> > >>>>
>> > >>>> Just to clarify:
>> > >>>> - Are you proposing to create a user facing locking feature in
>> Iceberg,
>> > >>>> or just something something for internal use?
>> > >>>>
>> > >>>> I think we shouldn't add locking to Iceberg's user facing scope in
>> this
>> > >>>> stage. A fully featured locking system has many more features that
>> we need
>> > >>>> (priorities, fairness, timeouts etc). I could be tempted when we
>> are
>> > >>>> talking about the REST catalog, but I think that should be further
>> down the
>> > >>>> road, if ever...
>> > >>>>
>> > >>>> About using the tags:
>> > >>>> - I whole-heartedly agree that using tags is not intuitive, and I
>> see
>> > >>>> your points in most of your arguments. OTOH, introducing new
>> requirement
>> > >>>> (locking mechanism) seems like a wrong direction to me.
>> > >>>> - We already defined a requirement (atomic changes on the table)
>> for
>> > >>>> the Catalog implementations which could be used to archive our
>> goal here.
>> > >>>> - We also already store technical data in snapshot properties in
>> Flink
>> > >>>> jobs (JobId, OperatorId, CheckpointId). Maybe technical tags/table
>> > >>>> properties is not a big stretch.
>> > >>>>
>> > >>>> Or we can look at these tags or metadata as 'technical data' which
>> is
>> > >>>> internal to Iceberg, and shouldn't expressed on the external API.
>> My
>> > >>>> concern is:
>> > >>>> - Would it be used often enough to worth the additional complexity?
>> > >>>>
>> > >>>> Knowing that Spark compaction is struggling with the same issue is
>> a
>> > >>>> good indicator, but probably we would need more use cases for
>> introducing a
>> > >>>> new feature with this complexity, or simpler solution.
>> > >>>>
>> > >>>> Thanks, Peter
>> > >>>>
>> > >>>>
>> > >>>> On Mon, Apr 1, 2024, 10:18 Manu Zhang <ow...@gmail.com> wrote:
>> > >>>>
>> > >>>>> What would the community think of exploiting tags for preventing
>> > >>>>>> concurrent maintenance loop executions.
>> > >>>>>
>> > >>>>>
>> > >>>>> This issue is not specific to Flink maintenance jobs. We have a
>> > >>>>> service scheduling Spark maintenance jobs by watching table
>> commits. When
>> > >>>>> we don't check in-progress maintenance jobs for the same table,
>> multiple
>> > >>>>> jobs will run concurrently and have conflicts.
>> > >>>>>
>> > >>>>> Basically, I think we need a lock mechanism like the metastore
>> lock
>> > >>>>> <
>> https://iceberg.apache.org/docs/nightly/configuration/#hadoop-configuration
>> >
>> > >>>>> if we want to handle it for users. However, using TAG doesn't look
>> > >>>>> intuitive to me. We are also mixing user data with system
>> metadata.
>> > >>>>> Maybe we can define some general interfaces and leave the
>> > >>>>> implementation to users in the first version.
>> > >>>>>
>> > >>>>> Regards,
>> > >>>>> Manu
>> > >>>>>
>> > >>>>>
>> > >>>>>
>> > >>>>> On Fri, Mar 29, 2024 at 1:59 PM Péter Váry <
>> > >>>>> peter.vary.apa...@gmail.com> wrote:
>> > >>>>>
>> > >>>>>> What would the community think of exploiting tags for preventing
>> > >>>>>> concurrent maintenance loop executions.
>> > >>>>>>
>> > >>>>>> The issue:
>> > >>>>>> Some maintenance tasks couldn't run parallel, like
>> DeleteOrphanFiles
>> > >>>>>> vs. ExpireSnapshots, or RewriteDataFiles vs.
>> RewriteManifestFiles. We make
>> > >>>>>> sure, not to run tasks started by a single trigger concurrently
>> by
>> > >>>>>> serializing them, but there are no loops in Flink, so we can't
>> synchronize
>> > >>>>>> tasks started by the next trigger.
>> > >>>>>>
>> > >>>>>> In the document, I describe that we need to rely on the user to
>> > >>>>>> ensure that the rate limit is high enough to prevent concurrent
>> triggers.
>> > >>>>>>
>> > >>>>>> Proposal:
>> > >>>>>> When firing a trigger, RateLimiter could check and create an
>> Iceberg
>> > >>>>>> table tag [1] for the current table snapshot, with the name:
>> > >>>>>> '__flink_maitenance'. When the execution finishes we remove this
>> tag. The
>> > >>>>>> RateLimiter keep accumulating changes, and doesn't fire new
>> triggers until
>> > >>>>>> it finds this tag on the table.
>> > >>>>>> The solution relies on Flink 'forceNonParallel' to prevent
>> concurrent
>> > >>>>>> execution of placing the tag, and on Iceberg to store it.
>> > >>>>>>
>> > >>>>>> This not uses the tags as intended, but seems like a better
>> solution
>> > >>>>>> than adding/removing table properties which would clutter the
>> table history
>> > >>>>>> with technical data.
>> > >>>>>>
>> > >>>>>> Your thoughts? Any other suggestions/solutions would be welcome.
>> > >>>>>>
>> > >>>>>> Thanks,
>> > >>>>>> Peter
>> > >>>>>>
>> > >>>>>> [1]
>> > >>>>>>
>> https://iceberg.apache.org/docs/latest/java-api-quickstart/#branching-and-tagging
>> > >>>>>>
>> > >>>>>> On Thu, Mar 28, 2024, 14:44 Péter Váry <pe...@gmail.com>
>> > >>>>>> wrote:
>> > >>>>>>
>> > >>>>>>> Hi Team,
>> > >>>>>>>
>> > >>>>>>> As discussed on yesterday's community sync, I am working on
>> adding a
>> > >>>>>>> possibility to the Flink Iceberg connector to run maintenance
>> tasks on the
>> > >>>>>>> Iceberg tables. This will fix the small files issues and in the
>> long run
>> > >>>>>>> help compacting the high number of positional and equality
>> deletes created
>> > >>>>>>> by Flink tasks writing CDC data to Iceberg tables without the
>> need of Spark
>> > >>>>>>> in the infrastructure.
>> > >>>>>>>
>> > >>>>>>> I did some planning, prototyping and currently trying out the
>> > >>>>>>> solution on a larger scale.
>> > >>>>>>>
>> > >>>>>>> I put together a document how my current solution looks like:
>> > >>>>>>>
>> > >>>>>>>
>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp=sharing
>> > >>>>>>>
>> > >>>>>>> I would love to hear your thoughts and feedback on this to find
>> a
>> > >>>>>>> good final solution.
>> > >>>>>>>
>> > >>>>>>> Thanks,
>> > >>>>>>> Peter
>> > >>>>>>>
>> > >>>>>>
>> >
>
>

Reply via email to