Hi,

Sorry for joining the discussion so late. I'm from the flink community and
did some work about the SinkV2. I'd like to share some thoughts from the
view of flink. While I'm quite new to Iceberg, please feel free to correct
me if I'm making any mistakes.

1. checkpointing
I think the maintenance tasks are better not to disturb the original job.
So it's better not to block the checkpointing, or the checkpoints may
timeout and fail when a heavy task is running, which may cause the job
failing. To achieve this, the task executing operators need to split the
heavy task into small ones, or execute the tasks asynchronously in an
executor pool instead of the main task thread and store the task requests
in the snapshots. In this way the snapshotting can be done in a guaranteed
short time. While if the tasks are done with complex topologies, it may be
impossible to do this. If so, I think it's better to start a separate job
instead of using the post commit topology.

2. resources
Occupying lots of resources when tasks are idle may be an issue, but to
execute the tasks periodically in the job, it seems that there's no perfect
solution. All we can do is to reduce the resource requirements.

On the other hand, Flink recently supported the CALL statement in 1.18. I
have to say I'm not really familiar with it, but it seems to be a solution
to execute the tasks for one time. Maybe we can create a monitoring job (or
in the main function), which can trigger tasks by executing CALL
statements. In this way resource wasting can be avoided, but this is quite
a different direction indeed.

Besides, I remember that at the meeting, it's proposed to adjust the
resource for maintenance tasks by auto scaling. I'm afraid it's not a good
idea to rely on it. As far as I know, rescaling it triggers needs to
restart the job, which may not be acceptable by most of the users.

3. concurrency
If the table is only operated in one Flink job, we can use
OperatorCoordinator to coordinate the tasks. OperatorCoordinator may also
contact each other to ensure different kinds of tasks are executed in
expected ways.

While I suppose it's necessary to have a lock mechanism if we want to
coordinate the operation from different engines.

(Am I replying to the correct mail?)

Thanks,
Gen

On Tue, Apr 9, 2024 at 12:29 AM Brian Olsen <bitsondata...@gmail.com> wrote:

> Hey Iceberg nation,
>
> I would like to share about the meeting this Wednesday to further discuss
> details of Péter's proposal on Flink Maintenance Tasks.
> Calendar Link: https://calendar.app.google/83HGYWXoQJ8zXuVCA
>
> List discussion:
> https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl
> <https://www.google.com/url?q=https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl&sa=D&source=calendar&usd=2&usg=AOvVaw2-aePIRr6APFVHpRDipMgX>
>
> Design Doc: Flink table maintenance
> <https://www.google.com/url?q=https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp%3Dsharing&sa=D&source=calendar&usd=2&usg=AOvVaw1oLYQP76-G1ZEOW5pTxV1M>
>
>
>
> On Mon, Apr 1, 2024 at 8:52 PM Manu Zhang <owenzhang1...@gmail.com> wrote:
>
>> Hi Peter,
>>
>> Are you proposing to create a user facing locking feature in Iceberg, or
>>> just something something for internal use?
>>>
>>
>> Since it's a general issue, I'm proposing to create a general user
>> interface first, while the implementation can be left to users. For
>> example, we use Airflow to schedule maintenance jobs and we can check
>> in-progress jobs with the Airflow API. Hive metastore lock might be another
>> option we can implement for users.
>>
>> Thanks,
>> Manu
>>
>> On Tue, Apr 2, 2024 at 5:26 AM Péter Váry <peter.vary.apa...@gmail.com>
>> wrote:
>>
>>> Hi Ajantha,
>>>
>>> I thought about enabling post commit topology based compaction for sinks
>>> using options, like we use for the parametrization of streaming reads [1].
>>> I think it will be hard to do it in a user friendly way - because of the
>>> high number of parameters -, but I think it is a possible solution with
>>> sensible defaults.
>>>
>>> There is a batch-like solution for data file compaction already
>>> available [2], but I do not see how we could extend Flink SQL to be able to
>>> call it.
>>>
>>> Writing to a branch using Flink SQL should be another thread, but by my
>>> first guess, it shouldn't be hard to implement using options, like:
>>> /*+ OPTIONS('branch'='b1') */
>>> Since writing to branch i already working through the Java API [3].
>>>
>>> Thanks, Peter
>>>
>>> 1 -
>>> https://iceberg.apache.org/docs/latest/flink-queries/#flink-streaming-read
>>> 2 -
>>> https://github.com/apache/iceberg/blob/820fc3ceda386149f42db8b54e6db9171d1a3a6d/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
>>> 3 -
>>> https://iceberg.apache.org/docs/latest/flink-writes/#branch-writes
>>>
>>> On Mon, Apr 1, 2024, 16:30 Ajantha Bhat <ajanthab...@gmail.com> wrote:
>>>
>>>> Thanks for the proposal Peter.
>>>>
>>>> I just wanted to know do we have any plans for supporting SQL syntax
>>>> for table maintenance (like CALL procedure) for pure Flink SQL users?
>>>> I didn't see any custom SQL parser plugin support in Flink. I also saw
>>>> that Branch write doesn't have SQL support (only Branch reads use Option),
>>>> So I am not sure about the roadmap of Iceberg SQL support in Flink.
>>>> Was there any discussion before?
>>>>
>>>> - Ajantha
>>>>
>>>> On Mon, Apr 1, 2024 at 7:51 PM Péter Váry <peter.vary.apa...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Manu,
>>>>>
>>>>> Just to clarify:
>>>>> - Are you proposing to create a user facing locking feature in
>>>>> Iceberg, or just something something for internal use?
>>>>>
>>>>> I think we shouldn't add locking to Iceberg's user facing scope in
>>>>> this stage. A fully featured locking system has many more features that we
>>>>> need (priorities, fairness, timeouts etc). I could be tempted when we are
>>>>> talking about the REST catalog, but I think that should be further down 
>>>>> the
>>>>> road, if ever...
>>>>>
>>>>> About using the tags:
>>>>> - I whole-heartedly agree that using tags is not intuitive, and I see
>>>>> your points in most of your arguments. OTOH, introducing new requirement
>>>>> (locking mechanism) seems like a wrong direction to me.
>>>>> - We already defined a requirement (atomic changes on the table) for
>>>>> the Catalog implementations which could be used to archive our goal here.
>>>>> - We also already store technical data in snapshot properties in Flink
>>>>> jobs (JobId, OperatorId, CheckpointId). Maybe technical tags/table
>>>>> properties is not a big stretch.
>>>>>
>>>>> Or we can look at these tags or metadata as 'technical data' which is
>>>>> internal to Iceberg, and shouldn't expressed on the external API. My
>>>>> concern is:
>>>>> - Would it be used often enough to worth the additional complexity?
>>>>>
>>>>> Knowing that Spark compaction is struggling with the same issue is a
>>>>> good indicator, but probably we would need more use cases for introducing 
>>>>> a
>>>>> new feature with this complexity, or simpler solution.
>>>>>
>>>>> Thanks, Peter
>>>>>
>>>>>
>>>>> On Mon, Apr 1, 2024, 10:18 Manu Zhang <owenzhang1...@gmail.com> wrote:
>>>>>
>>>>>> What would the community think of exploiting tags for preventing
>>>>>>> concurrent maintenance loop executions.
>>>>>>
>>>>>>
>>>>>> This issue is not specific to Flink maintenance jobs. We have a
>>>>>> service scheduling Spark maintenance jobs by watching table commits. When
>>>>>> we don't check in-progress maintenance jobs for the same table, multiple
>>>>>> jobs will run concurrently and have conflicts.
>>>>>>
>>>>>> Basically, I think we need a lock mechanism like the metastore lock
>>>>>> <https://iceberg.apache.org/docs/nightly/configuration/#hadoop-configuration>
>>>>>> if we want to handle it for users. However, using TAG doesn't look
>>>>>> intuitive to me. We are also mixing user data with system metadata.
>>>>>> Maybe we can define some general interfaces and leave the
>>>>>> implementation to users in the first version.
>>>>>>
>>>>>> Regards,
>>>>>> Manu
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Mar 29, 2024 at 1:59 PM Péter Váry <
>>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>>
>>>>>>> What would the community think of exploiting tags for preventing
>>>>>>> concurrent maintenance loop executions.
>>>>>>>
>>>>>>> The issue:
>>>>>>> Some maintenance tasks couldn't run parallel, like DeleteOrphanFiles
>>>>>>> vs. ExpireSnapshots, or RewriteDataFiles vs. RewriteManifestFiles. We 
>>>>>>> make
>>>>>>> sure, not to run tasks started by a single trigger concurrently by
>>>>>>> serializing them, but there are no loops in Flink, so we can't 
>>>>>>> synchronize
>>>>>>> tasks started by the next trigger.
>>>>>>>
>>>>>>> In the document, I describe that we need to rely on the user to
>>>>>>> ensure that the rate limit is high enough to prevent concurrent 
>>>>>>> triggers.
>>>>>>>
>>>>>>> Proposal:
>>>>>>> When firing a trigger, RateLimiter could check and create an Iceberg
>>>>>>> table tag [1] for the current table snapshot, with the name:
>>>>>>> '__flink_maitenance'. When the execution finishes we remove this tag. 
>>>>>>> The
>>>>>>> RateLimiter keep accumulating changes, and doesn't fire new triggers 
>>>>>>> until
>>>>>>> it finds this tag on the table.
>>>>>>> The solution relies on Flink 'forceNonParallel' to prevent
>>>>>>> concurrent execution of placing the tag, and on Iceberg to store it.
>>>>>>>
>>>>>>> This not uses the tags as intended, but seems like a better solution
>>>>>>> than adding/removing table properties which would clutter the table 
>>>>>>> history
>>>>>>> with technical data.
>>>>>>>
>>>>>>> Your thoughts? Any other suggestions/solutions would be welcome.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Peter
>>>>>>>
>>>>>>> [1]
>>>>>>> https://iceberg.apache.org/docs/latest/java-api-quickstart/#branching-and-tagging
>>>>>>>
>>>>>>> On Thu, Mar 28, 2024, 14:44 Péter Váry <peter.vary.apa...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Team,
>>>>>>>>
>>>>>>>> As discussed on yesterday's community sync, I am working on adding
>>>>>>>> a possibility to the Flink Iceberg connector to run maintenance tasks 
>>>>>>>> on
>>>>>>>> the Iceberg tables. This will fix the small files issues and in the 
>>>>>>>> long
>>>>>>>> run help compacting the high number of positional and equality deletes
>>>>>>>> created by Flink tasks writing CDC data to Iceberg tables without the 
>>>>>>>> need
>>>>>>>> of Spark in the infrastructure.
>>>>>>>>
>>>>>>>> I did some planning, prototyping and currently trying out the
>>>>>>>> solution on a larger scale.
>>>>>>>>
>>>>>>>> I put together a document how my current solution looks like:
>>>>>>>>
>>>>>>>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp=sharing
>>>>>>>>
>>>>>>>> I would love to hear your thoughts and feedback on this to find a
>>>>>>>> good final solution.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Peter
>>>>>>>>
>>>>>>>

Reply via email to