If this is specific to solving the problem that there is no notification
when a task finishes in Flink, then I think it makes sense to use a JDBC
lock. I'd prefer that this not add the tag-based locking strategy because I
think that has the potential to be misunderstood by people using the
library
Hi Anton, nice to hear from you!
Thanks Ryan for your continued interest!
You can find my answers below:
> Am I right to say the proposal has the following high-level goals:
> - Perform cheap maintenance actions periodically after commits using the
same cluster (suitable for things like rewriting
> If nobody else has a better idea, then I will add a default JDBC based
locking implementation to the PR
Do you mean an implementation of `LockManager` in core or something
specific to this Flink application?
On Tue, Aug 6, 2024 at 2:28 AM Péter Váry
wrote:
> > > We can make sure that the Task
Took a look at the doc as well as this thread.
Am I right to say the proposal has the following high-level goals:
- Perform cheap maintenance actions periodically after commits using the
same cluster (suitable for things like rewriting manifests, compacting tiny
data files).
- Offer an ability to
> If all the maintenance tasks are created from a single Flink job, is it
possible to simply skip new maintenance task if there’s already running
task? The running maintenance tasks could be recorded in the JM?
The operator scheduling the tasks doesn't know when the actual tasks are
finished. We n
> DataFile rewrite will create a new manifest file. This means if a DataFile
> rewrite task is finished and committed, and there is a concurrent
> ManifestFile rewrite then the ManifestFile rewrite will fail. I have played
> around with serializing the Maintenance Tasks (resulted in a very ugly/
> > We can make sure that the Tasks can tolerate concurrent runs, but as
mentioned in the doc, in most cases having concurrent runs are a waste of
resources, because of the commit conflicts.
>
> Is the problem that users may configure multiple jobs that are all trying
to run maintenance procedures?
Hi Peter,
We rely on Airflow to schedule and coordinate maintenance Spark jobs. I
agree with Ryan that an Iceberg solution is not a good choice here.
Thanks,
Manu
On Tue, Aug 6, 2024 at 1:07 AM Ryan Blue
wrote:
> > We can make sure that the Tasks can tolerate concurrent runs, but as
> mentione
> We can make sure that the Tasks can tolerate concurrent runs, but as
mentioned in the doc, in most cases having concurrent runs are a waste of
resources, because of the commit conflicts.
Is the problem that users may configure multiple jobs that are all trying
to run maintenance procedures? If s
Thanks everyone for your answers! I really appreciate it, especially since
these come into during the weekend, using your own time.
@Manu, during our initial discussion, you have mentioned that you had
similar issues with Spark compactions. You needed locking there. Is it
still an issue?
If Spark
Not familiar with Flink, I'm wondering how Flink resolves concurrency
issues in common Flink use cases. For example, how does Flink prevent two
jobs from writing to the same file?
On the other hand, an Iceberg tag is eventually an atomic change to a file.
It's the same as using a file lock. I don'
I also don't feel it is the best fit to use tags to implement locks for
passing control messages. This is the main sticking point for me from the
design doc. However, we haven't been able to come up with a better solution
yet. Maybe we need to go back to the drawing board again.
I am also not sure
Hi Péter, thanks for bringing this up.
I don't think using a tag to "lock" a table is a good idea. The doc calls
out that this is necessary "Since Flink doesn’t provide an out of the box
solution for downstream operators sending feedback to upstream operators"
so this feels like using Iceberg met
Hi Peter,
I'd like to clarify that using AsyncIO doesn't mean the unaligned
checkpoint should not be used, but not have to. With unaligned checkpoint
and AsyncIO, the job can do checkpointing in a guaranteed time, while users
can use aligned checkpoint if they don't want to enable unaligned
checkp
Hi Peter,
> Flink job doing a quick small file reduction after collecting several
commits
Triggering maintenance tasks as soon as possible is a valid point to me.
But I'm not sure about the priority of it, compared to maintenance tasks
which may happen with a delay of few seconds.
> Single, cont
Hi Gen,
*Unaligned checkpoint & AsyncIO*
Let's talk about a concrete example: DataFileRewrite. The task has 3 steps:
1. Planning - this creates multiple RewriteFileGroups, each of which
contains the list of small files which should be compacted to a single new
file
2. Rewriting data f
Hi Peter,
Thanks for the reply and explanation!
> Unaligned checkpoint & AsyncIO
The issue only occurs when the inflight queue is full. Synchronous
operators can not do checkpointing until the processing request is
done, while async operators are always ready to do checkpointing, unless
the infli
Thanks Zhu and Gen for your reply!
*Unaligned checkpoints*
The maintenance tasks are big and complex. There are many parts which can
be parallelized to multiple tasks, so I think we should implement them with
chains of Flink Operators to exploit the advantages of Flink.
A single maintenance task c
Hi,
Thanks for the reply, Peter.
And thanks Zhu for joining the discussion.
> In the current design, using unaligned checkpoints, and splitting heavy
tasks to smaller ones could avoid maintenance tasks blocking the
checkpointing.
> How do you suggest executing asynchronous tasks
As far as I know
Hi Peter,
Thanks for starting this discussion.
I'm a bit uncertain about the necessity of using extended sink topology
to run maintenance tasks. Creating a separate pipeline for maintenance
monitor, scheduler and tasks sounds a better choice to me in most cases.
Advantages I can think of include
Hi Gen,
Thanks for your interest and thoughts!
See my answers and questions below:
Gen Luo ezt írta (időpont: 2024. ápr. 17., Sze,
20:32):
> Hi,
>
> Sorry for joining the discussion so late. I'm from the flink community and
> did some work about the SinkV2. I'd like to share some thoughts from
Hi,
Sorry for joining the discussion so late. I'm from the flink community and
did some work about the SinkV2. I'd like to share some thoughts from the
view of flink. While I'm quite new to Iceberg, please feel free to correct
me if I'm making any mistakes.
1. checkpointing
I think the maintenanc
- Technically would it be possible not to force partition cols into the PK?
I believe this is possible, but probably less performant. It is mentioned
in the docs https://iceberg.apache.org/spec/#scan-planning
>From the documentation:
"An equality delete file must be applied to a data file when a
Hey Iceberg nation,
I would like to share about the meeting this Wednesday to further discuss
details of Péter's proposal on Flink Maintenance Tasks.
Calendar Link: https://calendar.app.google/83HGYWXoQJ8zXuVCA
List discussion:
https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl
Hi Peter,
Are you proposing to create a user facing locking feature in Iceberg, or
> just something something for internal use?
>
Since it's a general issue, I'm proposing to create a general user
interface first, while the implementation can be left to users. For
example, we use Airflow to sched
Hi Ajantha,
I thought about enabling post commit topology based compaction for sinks
using options, like we use for the parametrization of streaming reads [1].
I think it will be hard to do it in a user friendly way - because of the
high number of parameters -, but I think it is a possible solutio
Thanks for the proposal Peter.
I just wanted to know do we have any plans for supporting SQL syntax for
table maintenance (like CALL procedure) for pure Flink SQL users?
I didn't see any custom SQL parser plugin support in Flink. I also saw that
Branch write doesn't have SQL support (only Branch r
Hi Manu,
Just to clarify:
- Are you proposing to create a user facing locking feature in Iceberg, or
just something something for internal use?
I think we shouldn't add locking to Iceberg's user facing scope in this
stage. A fully featured locking system has many more features that we need
(prior
>
> What would the community think of exploiting tags for preventing
> concurrent maintenance loop executions.
This issue is not specific to Flink maintenance jobs. We have a service
scheduling Spark maintenance jobs by watching table commits. When we don't
check in-progress maintenance jobs for
What would the community think of exploiting tags for preventing concurrent
maintenance loop executions.
The issue:
Some maintenance tasks couldn't run parallel, like DeleteOrphanFiles vs.
ExpireSnapshots, or RewriteDataFiles vs. RewriteManifestFiles. We make
sure, not to run tasks started by a si
30 matches
Mail list logo