Hi Wenjin, See my answers below:
On Sat, Mar 30, 2024, 10:54 wenjin <wenjin...@gmail.com> wrote: > Hi Peter, > > I am interested in your proposal and think make iceberg Flink Connector > support running maintenance task is meaningful . If possible, could you > help me clarify a few confusions. > > - When the iceberg table is written by single Flink job (use case1, 2),the > maintenance tasks will be added to the post commit topology. How dose the > maintenance tasks execute? Synchronously or Asynchronously? Will the > maintenance tasks block the data processing of Flink job? > The sceduling and maintenance tasks are just regular Flink operators. Also the scheduler will make sure that the maintenance tasks are not chained to the Iceberg committer, so I would call this Asynchronous. Flink operators do not block other operators, but the maintenance tasks are competing for resources with the other data processing tasks. That is why we provide the possibility to define slot sharing groups for the maintenance tasks. This allows the users to separate the provided resources as much as Flink allows. I have seen only one exception to this separation where we emit high number of records in the maintenance flow, which would cause delays in starting the checpoints, but it could be mitigated by enabling unaligned checkpoints, and using AsyncIO. There is one issue with AsynIO found by Gyula Fora: https://issues.apache.org/jira/browse/FLINK-34704 which means, even with AsyncIO the checkpoint could be blocked until at least one compaction group is finished. - When the iceberg table is written by multi Flink jobs (use case 3), user > need to create a separate Flink job to run the maintenance task. In this > case, if user do not create a single job, but enable run maintenance task > in exist Flink jobs just like use case 1, what would be the consequences? > Or, is there an automatic mechanism to avoid this issue? > The user needs to create a new job, or chose a single job to run the maintenance tasks to avoid running concurrent instances of the compaction tasks. Even if concurrent compaction tasks could be handled, they would be a serious waste of resources and increase the likelihood of failing tasks due to concurrent changes on the table. So we do not plan to support this ATM. About the difference of the 2 scheduling method: - In case 1-2, the scheduling information is coming from the Iceberg committer - this is working for a single writer. - In case 3, the scheduling information is coming from the monitor - this is working for any numbers of writers. So even if the maintenance tasks are run in one of the jobs, when there are multiple writers, the scheduling should be based on monitoring the changes on the table, instead of the information coming from the committer (which could only contain the changes only from a single writer) I hope this helps, Peter > Thank you. > > Best, > Wenjin > > On 2024/03/28 17:59:49 Péter Váry wrote: > > Hi Team, > > > > I am working on adding a possibility to the Flink Iceberg connector to > run > > maintenance tasks on the Iceberg tables. This will fix the small files > > issues and in the long run help compacting the high number of positional > > and equality deletes created by Flink tasks writing CDC data to Iceberg > > tables without the need of Spark in the infrastructure. > > > > I did some planning, prototyping and currently trying out the solution > on a > > larger scale. > > > > I put together a document how my current solution looks like: > > > https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp=sharing > < > https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp=sharing > > > > > > I would love to hear your thoughts and feedback on this to find a good > > final solution. > > > > Thanks, > > Peter > >