[jira] [Created] (FLINK-34982) FLIP-428: Fault Tolerance/Rescale Integration for Disaggregated State

2024-04-01 Thread Jinzhong Li (Jira)
Jinzhong Li created FLINK-34982:
---

 Summary: FLIP-428: Fault Tolerance/Rescale Integration for 
Disaggregated State
 Key: FLINK-34982
 URL: https://issues.apache.org/jira/browse/FLINK-34982
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Checkpointing, Runtime / State Backends
Reporter: Jinzhong Li
 Fix For: 2.0.0


This is a sub-FLIP for the disaggregated state management and its related work, 
please read the [FLIP-423|https://cwiki.apache.org/confluence/x/R4p3EQ] first 
to know the whole story.

As outlined in [FLIP-423|https://cwiki.apache.org/confluence/x/R4p3EQ] [1] and 
[FLIP-427|https://cwiki.apache.org/confluence/x/T4p3EQ] [2], we proposed to 
disaggregate StateManagement and introduced a disaggregated state storage named 
ForSt, which evolves from RocksDB. Within the new framework, where the primary 
storage is placed on the remote file system, several challenges emerge when 
attempting to reuse the existing fault-tolerance mechanisms of local RocksDB:
 * Because most remote file system don't support hard-link, ForSt can't utilize 
hard-link to capture a consistent snapshot during checkpoint synchronous phase 
as rocksdb currently does.
 * The existing file transfer mechanism within RocksDB is inefficient during 
checkpoints; it involves first downloading the remote working state data to 
local memory and then uploading it to the checkpoint directory. Likewise, both 
restore and rescale face the similar problems due to superfluous data 
transmission.

In order to solve the above problems and improve checkpoint/restore/rescaling 
performance of disaggregated storage, this FLIP proposes:
 # A new checkpoint strategy for disaggregated state storage: leverage 
RocksDB's low-level api to retain a consistent snapshot during the checkpoint 
synchronous phase; and then transfer the snapshot files to checkpoint directory 
during asynchronous phase;
 # Accelerating checkpoint/restore/rescaling by leverage fast-duplication of 
remote file system, which can bypass the local TaskManager when transferring 
data between remote working directory and checkpoint directory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34983) Hive param sink.partition-commit.policy.kind default value is not work in stream mode

2024-04-01 Thread yunfan (Jira)
yunfan created FLINK-34983:
--

 Summary: Hive param sink.partition-commit.policy.kind default 
value is not work in stream mode
 Key: FLINK-34983
 URL: https://issues.apache.org/jira/browse/FLINK-34983
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Reporter: yunfan


This PR https://github.com/apache/flink/pull/20469/ added a default value for 
the hive parameter {{{}sink.partition-commit.policy.kind{}}}, but this does not 
take effect in the streaming mode because an earlier PR 
[https://github.com/apache/flink/pull/16370] will detect whether this parameter 
is set.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-399: Flink Connector Doris

2024-04-01 Thread wudi
Hi,

Gentle ping to see if there are any other concerns or things that seems missing 
from the FLIP.

Brs
di.wu

> 2024年3月25日 17:52,Feng Jin  写道:
> 
> Hi Di
> 
> Thank you for your patience and explanation.
> 
> If this is a server-side configuration, we currently cannot modify it in
> the client configuration. If Doris supports client-side configuration in
> the future, we can reconsider whether to support it.
> 
> I currently have no other questions regarding this FLIP.  LGTM.
> 
> 
> Best,
> Feng
> 
> On Mon, Mar 25, 2024 at 3:42 PM wudi <676366...@qq.com.invalid> wrote:
> 
>> Hi, Feng
>> 
>> Yes, if the StreamLoad transaction timeout is very short, you may
>> encounter this situation.
>> 
>> The timeout for StreamLoad transactions is controlled by the
>> streaming_label_keep_max_second parameter [1] in FE (Frontend), and the
>> default value is 12 hours. Currently, it is a global transaction
>> configuration and cannot be set separately for a specific transaction.
>> 
>> However, I understand the default 12-hour timeout should cover most cases
>> unless you are restarting from a checkpoint that occurred a long time ago.
>> What do you think?
>> 
>> 
>> [1]
>> https://github.com/apache/doris/blob/master/fe/fe-common/src/main/java/org/apache/doris/common/Config.java#L163-L168
>> 
>> 
>> Brs
>> di.wu
>> 
>>> 2024年3月25日 11:45,Feng Jin  写道:
>>> 
>>> Hi Di
>>> 
>>> Thanks for your reply.
>>> 
>>> The timeout I'm referring to here is not the commit timeout, but rather
>> the
>>> timeout for a single streamLoad transaction.
>>> 
>>> Let's say we have set the transaction timeout for StreamLoad to be 10
>>> minutes. Now, imagine there is a Flink job with two subtasks. Due to
>>> significant data skew and backpressure issues, subtask 0 and subtask 1
>> are
>>> processing at different speeds. Subtask 0 finishes processing this
>>> checkpoint first, while subtask 1 takes another 10 minutes to complete
>> its
>>> processing. At this point, the job's checkpoint is done. However, since
>>> subtask 0 has been waiting for subtask 1 all along, its corresponding
>>> streamLoad transaction closes after more than 10 minutes have passed - by
>>> which time the server has already cleaned up this transaction, leading
>> to a
>>> failed commit.
>>> Therefore, I would like to know if in such situations we can avoid this
>>> problem by setting a longer lifespan for transactions.
>>> 
>>> 
>>> Best,
>>> Feng
>>> 
>>> 
>>> On Fri, Mar 22, 2024 at 10:24 PM wudi <676366...@qq.com.invalid> wrote:
>>> 
 Hi, Feng,
 
 1. Are you suggesting that when a commit gets stuck, we can interrupt
>> the
 commit request using a timeout parameter? Currently, there is no such
 parameter. In my understanding, in a two-phase commit, checkpoint must
>> be
 enabled, so the commit timeout is essentially the checkpoint timeout.
 Therefore, it seems unnecessary to add an additional parameter here.
>> What
 do you think?
 
 2. In addition to deleting checkpoints to re-consume data again, the
 Connector also provides an option to ignore commit errors[1]. However,
>> this
 option is only used for error recovery scenarios, such as when a
 transaction is cleared by the server but you want to reuse the upstream
 offset from the checkpoint.
 
 3. Also, thank you for pointing out the issue with the parameter. It has
 already been addressed[2], but the FLIP changes were overlooked. It has
 been updated.
 
 [1]
 
>> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java#L150-L160
 [2]
 
>> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java#L89-L98
 
 Brs
 di.wu
 
 
 
> 2024年3月22日 18:28,Feng Jin  写道:
> 
> Hi Di,
> 
> Thank you for the update, as well as quickly implementing corresponding
> capabilities including filter push down and project push down.
> 
> Regarding the transaction timeout, I still have some doubts. I would
>> like
> to confirm if we can control this timeout parameter in the connector,
 such
> as setting it to 10 minutes or 1 hour.
> Also, when a transaction is cleared by the server, the commit operation
 of
> the connector will fail, leading to job failure. In this case, can
>> users
> only choose to delete the checkpoint and re-consume historical data?
> 
> There is also a small question regarding the parameters*: *
> *doris.request.connect.timeout.ms <
 http://doris.request.connect.timeout.ms>*
> and d*oris.request.read.timeout.ms <
>> http://oris.request.read.timeout.ms
> *,
> can we change them to Duration type and remove the "ms" suffix.?
> This way, all time parameters can be kept uniform in type as duration.
> 
> 
> Best,
> Feng
>

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-04-01 Thread Jeyhun Karimov
Hi everyone,

Thanks for your valuable feedback!

The discussion on this FLIP has been going on for a while.
I would like to start a vote after 48 hours.

Please let me know if you have any concerns or any further
questions/comments.

Regards,
Jeyhun


On Thu, Mar 21, 2024 at 6:01 PM Jeyhun Karimov  wrote:

> Hi Lorenzo,
>
> Thanks a lot for your comments. Please find my answers below:
>
>
> For the interface `SupportsPartitioning`, why returning `Optional`?
>> If one decides to implement that, partitions must exist (at maximum,
>> return and empty list). Returning `Optional` seem just to complicate the
>> logic of the code using that interface.
>
>
> - The reasoning behind the use of Optional is that sometimes (e.g., in
> HiveTableSource) the partitioning info is in catalog.
>   Therefore, we return Optional.empty(), so that the list of partitions is
> queried from the catalog.
>
>
> I foresee the using code doing something like: "if the source supports
>> partitioning, get the partitions, but if they don't exist, raise a runtime
>> exception". Let's simply make that safe at compile time and guarantee the
>> code that partitions exist.
>
>
> - Yes, once partitions cannot be found, neither from catalog nor from the
> interface implementation, then we raise an exception during query compile
> time.
>
>
>  Another thing is that you show Hive-like partitioning in your FS
>> structure, do you think it makes sense to add a note about auto-discovery
>> of partitions?
>
>
> - Yes, the FLIP contains just an example partitioning for filesystem
> connector. Each connector already "knows" about autodiscovery of its
> partitions. And we rely on this fact.
>   For example, partition discovery is different between kafka and
> filesystem sources. So, we do not handle the manual discovery of
> partitions. Please correct me if I misunderstood your question.
>
>
> In other terms, it looks a bit counterintuitive that the user implementing
>> the source has to specify which partitions exist statically (and they can
>> change at runtime), while the source itself knows the data provider and can
>> directly implement a method `discoverPartitions`. Then Flink would take
>> care of invoking that method when needed.
>
>
> We utilize table option SOURCE_MONITOR_INTERVAL to check whether
> partitions are static or not. So, a user still should give Flink a hint
> about partitions being static or not. With static partitions Flink can do
> more optimizations.
>
> Please let me know if my replies answer your questions or if you have more
> comments.
>
> Regards,
> Jeyhun
>
>
>
> On Thu, Mar 21, 2024 at 10:03 AM  wrote:
>
>> Hello Jeyhun,
>> I really like the proposal and definitely makes sense to me.
>>
>> I have a couple of nits here and there:
>>
>> For the interface `SupportsPartitioning`, why returning `Optional`?
>> If one decides to implement that, partitions must exist (at maximum,
>> return and empty list). Returning `Optional` seem just to complicate the
>> logic of the code using that interface.
>>
>> I foresee the using code doing something like: "if the source supports
>> partitioning, get the partitions, but if they don't exist, raise a runtime
>> exception". Let's simply make that safe at compile time and guarantee the
>> code that partitions exist.
>>
>> Another thing is that you show Hive-like partitioning in your FS
>> structure, do you think it makes sense to add a note about auto-discovery
>> of partitions?
>>
>> In other terms, it looks a bit counterintuitive that the user
>> implementing the source has to specify which partitions exist statically
>> (and they can change at runtime), while the source itself knows the data
>> provider and can directly implement a method `discoverPartitions`. Then
>> Flink would take care of invoking that method when needed.
>> On Mar 15, 2024 at 22:09 +0100, Jeyhun Karimov ,
>> wrote:
>>
>> Hi Benchao,
>>
>> Thanks for your comments.
>>
>> 1. What the parallelism would you take? E.g., 128 + 256 => 128? What
>>
>> if we cannot have a good greatest common divisor, like 127 + 128,
>> could we just utilize one side's pre-partitioned attribute, and let
>> another side just do the shuffle?
>>
>>
>>
>> There are two cases we need to consider:
>>
>> 1. Static Partition (no partitions are added during the query execution)
>> is
>> enabled AND both sources implement "SupportsPartitionPushdown"
>>
>> In this case, we are sure that no new partitions will be added at runtime.
>> So, we have a chance equalize both sources' partitions and parallelism,
>> IFF
>> both sources implement "SupportsPartitionPushdown" interface.
>> To achieve so, first we will fetch the existing partitions from source1
>> (say p_s1) and from source2 (say p_s2).
>> Then, we find the intersection of these two partition sets (say
>> p_intersect) and pushdown these partitions:
>>
>> SupportsPartitionPushDown::applyPartitions(p_intersect) // make sure that
>> only specific partitions are read
>> SupportsPartitioning::applyPartit

Re: Re: [ANNOUNCE] Apache Paimon is graduated to Top Level Project

2024-04-01 Thread Jeyhun Karimov
Congratulations!

Regards,
Jeyhun

On Mon, Apr 1, 2024 at 7:43 AM Guowei Ma  wrote:

> Congratulations!
> Best,
> Guowei
>
>
> On Mon, Apr 1, 2024 at 11:15 AM Feng Jin  wrote:
>
> > Congratulations!
> >
> > Best,
> > Feng Jin
> >
> > On Mon, Apr 1, 2024 at 10:51 AM weijie guo 
> > wrote:
> >
> >> Congratulations!
> >>
> >> Best regards,
> >>
> >> Weijie
> >>
> >>
> >> Hang Ruan  于2024年4月1日周一 09:49写道:
> >>
> >> > Congratulations!
> >> >
> >> > Best,
> >> > Hang
> >> >
> >> > Lincoln Lee  于2024年3月31日周日 00:10写道:
> >> >
> >> > > Congratulations!
> >> > >
> >> > > Best,
> >> > > Lincoln Lee
> >> > >
> >> > >
> >> > > Jark Wu  于2024年3月30日周六 22:13写道:
> >> > >
> >> > > > Congratulations!
> >> > > >
> >> > > > Best,
> >> > > > Jark
> >> > > >
> >> > > > On Fri, 29 Mar 2024 at 12:08, Yun Tang  wrote:
> >> > > >
> >> > > > > Congratulations to all Paimon guys!
> >> > > > >
> >> > > > > Glad to see a Flink sub-project has been graduated to an Apache
> >> > > top-level
> >> > > > > project.
> >> > > > >
> >> > > > > Best
> >> > > > > Yun Tang
> >> > > > >
> >> > > > > 
> >> > > > > From: Hangxiang Yu 
> >> > > > > Sent: Friday, March 29, 2024 10:32
> >> > > > > To: dev@flink.apache.org 
> >> > > > > Subject: Re: Re: [ANNOUNCE] Apache Paimon is graduated to Top
> >> Level
> >> > > > Project
> >> > > > >
> >> > > > > Congratulations!
> >> > > > >
> >> > > > > On Fri, Mar 29, 2024 at 10:27 AM Benchao Li <
> libenc...@apache.org
> >> >
> >> > > > wrote:
> >> > > > >
> >> > > > > > Congratulations!
> >> > > > > >
> >> > > > > > Zakelly Lan  于2024年3月29日周五 10:25写道:
> >> > > > > > >
> >> > > > > > > Congratulations!
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > Best,
> >> > > > > > > Zakelly
> >> > > > > > >
> >> > > > > > > On Thu, Mar 28, 2024 at 10:13 PM Jing Ge
> >> > >  >> > > > >
> >> > > > > > wrote:
> >> > > > > > >
> >> > > > > > > > Congrats!
> >> > > > > > > >
> >> > > > > > > > Best regards,
> >> > > > > > > > Jing
> >> > > > > > > >
> >> > > > > > > > On Thu, Mar 28, 2024 at 1:27 PM Feifan Wang <
> >> > zoltar9...@163.com>
> >> > > > > > wrote:
> >> > > > > > > >
> >> > > > > > > > > Congratulations!——
> >> > > > > > > > >
> >> > > > > > > > > Best regards,
> >> > > > > > > > >
> >> > > > > > > > > Feifan Wang
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > > At 2024-03-28 20:02:43, "Yanfei Lei" <
> fredia...@gmail.com
> >> >
> >> > > > wrote:
> >> > > > > > > > > >Congratulations!
> >> > > > > > > > > >
> >> > > > > > > > > >Best,
> >> > > > > > > > > >Yanfei
> >> > > > > > > > > >
> >> > > > > > > > > >Zhanghao Chen 
> 于2024年3月28日周四
> >> > > > 19:59写道:
> >> > > > > > > > > >>
> >> > > > > > > > > >> Congratulations!
> >> > > > > > > > > >>
> >> > > > > > > > > >> Best,
> >> > > > > > > > > >> Zhanghao Chen
> >> > > > > > > > > >> 
> >> > > > > > > > > >> From: Yu Li 
> >> > > > > > > > > >> Sent: Thursday, March 28, 2024 15:55
> >> > > > > > > > > >> To: d...@paimon.apache.org 
> >> > > > > > > > > >> Cc: dev ; user <
> >> > u...@flink.apache.org
> >> > > >
> >> > > > > > > > > >> Subject: Re: [ANNOUNCE] Apache Paimon is graduated to
> >> Top
> >> > > > Level
> >> > > > > > > > Project
> >> > > > > > > > > >>
> >> > > > > > > > > >> CC the Flink user and dev mailing list.
> >> > > > > > > > > >>
> >> > > > > > > > > >> Paimon originated within the Flink community,
> initially
> >> > > known
> >> > > > as
> >> > > > > > Flink
> >> > > > > > > > > >> Table Store, and all our incubating mentors are
> >> members of
> >> > > the
> >> > > > > > Flink
> >> > > > > > > > > >> Project Management Committee. I am confident that the
> >> > bonds
> >> > > of
> >> > > > > > > > > >> enduring friendship and close collaboration will
> >> continue
> >> > to
> >> > > > > > unite the
> >> > > > > > > > > >> two communities.
> >> > > > > > > > > >>
> >> > > > > > > > > >> And congratulations all!
> >> > > > > > > > > >>
> >> > > > > > > > > >> Best Regards,
> >> > > > > > > > > >> Yu
> >> > > > > > > > > >>
> >> > > > > > > > > >> On Wed, 27 Mar 2024 at 20:35, Guojun Li <
> >> > > > > gjli.schna...@gmail.com>
> >> > > > > > > > > wrote:
> >> > > > > > > > > >> >
> >> > > > > > > > > >> > Congratulations!
> >> > > > > > > > > >> >
> >> > > > > > > > > >> > Best,
> >> > > > > > > > > >> > Guojun
> >> > > > > > > > > >> >
> >> > > > > > > > > >> > On Wed, Mar 27, 2024 at 5:24 PM wulin <
> >> > > ouyangwu...@163.com>
> >> > > > > > wrote:
> >> > > > > > > > > >> >
> >> > > > > > > > > >> > > Congratulations~
> >> > > > > > > > > >> > >
> >> > > > > > > > > >> > > > 2024年3月27日 15:54,王刚  >> > .INVALID>
> >> > > > 写道:
> >> > > > > > > > > >> > > >
> >> > > > > > > > > >> > > > Congratulations~
> >> > > > > > > > > >> > > >
> >> > > > > > > > > >> > > >> 2024年3月26日 10:25,Jingsong Li <
> >> > jingsongl...@gmail.com
> >> > > >
> >> > > > > 写道:
> >> > > > > > > > > >> > > >>
> >> > > > > > > > > >> > > >> Hi 

Re: [VOTE] FLIP-423: Disaggregated State Storage and Management (Umbrella FLIP)

2024-04-01 Thread Yuan Mei
+1 vote myself & Thanks all for the voting.

I'm closing the vote and the result will be posted in a separate mail.

Best

Yuan

On Fri, Mar 29, 2024 at 3:07 PM yue ma  wrote:

> +1 (non-binding)
>
>
> Best,
> Yue
>


[RESULT][VOTE] FLIP-423: Disaggregated State Storage and Management (Umbrella FLIP)

2024-04-01 Thread Yuan Mei
Hey dev,

I'm happy to announce that FLIP-423: Disaggregated State Storage and
Management (Umbrella FLIP) [1] has been accepted with 7 approving votes (5
binding) [2]

Piotrek Nowojski (binding)
Feifan Wang (non-binding)
Jing Ge (binding)
Rui Fan (binding)
Xintong (binding)
Yue (non-binding)
Yuan Mei (binding)

Best
Yuan

[1] https://cwiki.apache.org/confluence/x/R4p3EQ
[2] https://www.mail-archive.com/dev@flink.apache.org/msg74884.html


[jira] [Created] (FLINK-34984) Disaggregated State Storage and Management (Umbrella FLIP)

2024-04-01 Thread Yuan Mei (Jira)
Yuan Mei created FLINK-34984:


 Summary: Disaggregated State Storage and Management (Umbrella FLIP)
 Key: FLINK-34984
 URL: https://issues.apache.org/jira/browse/FLINK-34984
 Project: Flink
  Issue Type: New Feature
  Components: API / Core, API / DataStream, Runtime / Checkpointing, 
Runtime / State Backends
Reporter: Yuan Mei






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-XXX: Introduce Flink SQL variables

2024-04-01 Thread Jeyhun Karimov
Hi Ferenc,

Thanks for the proposal. Sounds like a good idea!
I have a few questions on that:

- What is its impact on query optimization because resolving variables at
the parsing stage might affect query optimization.

- What is the scope of variables? I mean when and how they override each
other and when get out of their scopes?

- Does the proposal support dynamic assignment of the variables or the
value of variables should be known at query compile time?

- Can we somehow benefit from/leverage Calcite's parameterization feature
in this proposal?

Regards,
Jeyhun

On Thu, Mar 28, 2024 at 6:21 PM Ferenc Csaky 
wrote:

> Hi, Jim, Yanfei,
>
> Thanks for your comments! Let me reply in the order of the
> messages.
>
> > I'd prefer sticking to the SQL standard if possible.  Would
> > it be possible / sensible to allow for each syntax, perhaps
> > managed by a config setting?
>
> Correct me if I am wrong, but AFAIK variables are not part of
> the ANSI SQL standard. The '@' prefix is used by some widely
> used DB mgr, e.g. MySQL.
>
> Regarding having multiple resolution syntax, it would be possible,
> if we agree it adds value. Personally I do not have a strong
> opinion on that.
>
>
> > I'm new to Flink SQL and I'm curious if these variables can be
> > calculated from statements or expression [1]?
>
> Good point! The proposed solution would lack this functionality.
> On our platform, we have a working solution of this that was
> sufficient to solve the main problem we had to carry SQL between
> environments without change.
>
> At this point, variable values can only be literals, and they are
> automatically escaped during resolution. Except if they are
> resolved as a DDL statement property value.
>
> But if the community agrees that it would be useful to have the
> ability of calculated variables I would happily spend some time
> on possible solutions that makes sense in Flink.
>
> WDYT?
>
> Best,
> Ferenc
>
>
>
> On Thursday, March 28th, 2024 at 03:58, Yanfei Lei 
> wrote:
>
> >
> >
> > Hi Ferenc,
> >
> > Thanks for the proposal, using SQLvariables to exclude
> > environment-specific configuration from code sounds like a good idea.
> >
> > I'm new to Flink SQL and I'm curious if these variables can be
> > calculated from statements or expression [1]? In FLIP, it seems that
> > the values are in the form of StringLiteral.
> >
> >
> > [1]
> https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-aux-set-variable.html
> >
> > Jim Hughes jhug...@confluent.io.invalid 于2024年3月28日周四 04:54写道:
> >
> > > Hi Ferenc,
> > >
> > > Looks like a good idea.
> > >
> > > I'd prefer sticking to the SQL standard if possible. Would it be
> possible
> > > / sensible to allow for each syntax, perhaps managed by a config
> setting?
> > >
> > > Cheers,
> > >
> > > Jim
> > >
> > > On Tue, Mar 26, 2024 at 6:59 AM Ferenc Csaky ferenc.cs...@pm.me.invalid
> > > wrote:
> > >
> > > > Hello devs,
> > > >
> > > > I would like to start a discussion about FLIP-XXX: Introduce Flink
> SQL
> > > > variables [1].
> > > >
> > > > The main motivation behing this change is to be able to abstract
> Flink SQL
> > > > from
> > > > environment-specific configuration and provide a way to carry jobs
> between
> > > > environments (e.g. dev-stage-prod) without the need to make changes
> in the
> > > > code.
> > > > It can also be a way to decouple sensitive information from the job
> code,
> > > > or help
> > > > with redundant literals.
> > > >
> > > > The main decision regarding the proposed solution is to handle the
> > > > variable resolution
> > > > as early as possible on the given string statement, so the whole
> operation
> > > > is an easy and
> > > > lightweight string replace. But this approach introduces some
> limitations
> > > > as well:
> > > >
> > > > - The executed SQL will always be the unresolved, raw string, so in
> case
> > > > of secrets
> > > > a DESC operation would show them.
> > > > - Changing the value of a variable can break code that uses that
> variable.
> > > >
> > > > For more details, please check the FLIP [1]. There is also a stale
> Jira
> > > > about this [2].
> > > >
> > > > Looking forward to any comments and opinions!
> > > >
> > > > Thanks,
> > > > Ferenc
> > > >
> > > > [1]
> > > >
> https://docs.google.com/document/d/1-eUz-PBCdqNggG_irDT0X7fdL61ysuHOaWnrkZHb5Hc/edit?usp=sharing
> > > > [2] https://issues.apache.org/jira/browse/FLINK-17377
> >
> >
> >
> >
> > --
> > Best,
> > Yanfei
>


Re: Re: [ANNOUNCE] Apache Paimon is graduated to Top Level Project

2024-04-01 Thread Ron liu
Congratulations!

Best,
Ron

Jeyhun Karimov  于2024年4月1日周一 18:12写道:

> Congratulations!
>
> Regards,
> Jeyhun
>
> On Mon, Apr 1, 2024 at 7:43 AM Guowei Ma  wrote:
>
> > Congratulations!
> > Best,
> > Guowei
> >
> >
> > On Mon, Apr 1, 2024 at 11:15 AM Feng Jin  wrote:
> >
> > > Congratulations!
> > >
> > > Best,
> > > Feng Jin
> > >
> > > On Mon, Apr 1, 2024 at 10:51 AM weijie guo 
> > > wrote:
> > >
> > >> Congratulations!
> > >>
> > >> Best regards,
> > >>
> > >> Weijie
> > >>
> > >>
> > >> Hang Ruan  于2024年4月1日周一 09:49写道:
> > >>
> > >> > Congratulations!
> > >> >
> > >> > Best,
> > >> > Hang
> > >> >
> > >> > Lincoln Lee  于2024年3月31日周日 00:10写道:
> > >> >
> > >> > > Congratulations!
> > >> > >
> > >> > > Best,
> > >> > > Lincoln Lee
> > >> > >
> > >> > >
> > >> > > Jark Wu  于2024年3月30日周六 22:13写道:
> > >> > >
> > >> > > > Congratulations!
> > >> > > >
> > >> > > > Best,
> > >> > > > Jark
> > >> > > >
> > >> > > > On Fri, 29 Mar 2024 at 12:08, Yun Tang 
> wrote:
> > >> > > >
> > >> > > > > Congratulations to all Paimon guys!
> > >> > > > >
> > >> > > > > Glad to see a Flink sub-project has been graduated to an
> Apache
> > >> > > top-level
> > >> > > > > project.
> > >> > > > >
> > >> > > > > Best
> > >> > > > > Yun Tang
> > >> > > > >
> > >> > > > > 
> > >> > > > > From: Hangxiang Yu 
> > >> > > > > Sent: Friday, March 29, 2024 10:32
> > >> > > > > To: dev@flink.apache.org 
> > >> > > > > Subject: Re: Re: [ANNOUNCE] Apache Paimon is graduated to Top
> > >> Level
> > >> > > > Project
> > >> > > > >
> > >> > > > > Congratulations!
> > >> > > > >
> > >> > > > > On Fri, Mar 29, 2024 at 10:27 AM Benchao Li <
> > libenc...@apache.org
> > >> >
> > >> > > > wrote:
> > >> > > > >
> > >> > > > > > Congratulations!
> > >> > > > > >
> > >> > > > > > Zakelly Lan  于2024年3月29日周五 10:25写道:
> > >> > > > > > >
> > >> > > > > > > Congratulations!
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > Best,
> > >> > > > > > > Zakelly
> > >> > > > > > >
> > >> > > > > > > On Thu, Mar 28, 2024 at 10:13 PM Jing Ge
> > >> > >  > >> > > > >
> > >> > > > > > wrote:
> > >> > > > > > >
> > >> > > > > > > > Congrats!
> > >> > > > > > > >
> > >> > > > > > > > Best regards,
> > >> > > > > > > > Jing
> > >> > > > > > > >
> > >> > > > > > > > On Thu, Mar 28, 2024 at 1:27 PM Feifan Wang <
> > >> > zoltar9...@163.com>
> > >> > > > > > wrote:
> > >> > > > > > > >
> > >> > > > > > > > > Congratulations!——
> > >> > > > > > > > >
> > >> > > > > > > > > Best regards,
> > >> > > > > > > > >
> > >> > > > > > > > > Feifan Wang
> > >> > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > > > At 2024-03-28 20:02:43, "Yanfei Lei" <
> > fredia...@gmail.com
> > >> >
> > >> > > > wrote:
> > >> > > > > > > > > >Congratulations!
> > >> > > > > > > > > >
> > >> > > > > > > > > >Best,
> > >> > > > > > > > > >Yanfei
> > >> > > > > > > > > >
> > >> > > > > > > > > >Zhanghao Chen 
> > 于2024年3月28日周四
> > >> > > > 19:59写道:
> > >> > > > > > > > > >>
> > >> > > > > > > > > >> Congratulations!
> > >> > > > > > > > > >>
> > >> > > > > > > > > >> Best,
> > >> > > > > > > > > >> Zhanghao Chen
> > >> > > > > > > > > >> 
> > >> > > > > > > > > >> From: Yu Li 
> > >> > > > > > > > > >> Sent: Thursday, March 28, 2024 15:55
> > >> > > > > > > > > >> To: d...@paimon.apache.org 
> > >> > > > > > > > > >> Cc: dev ; user <
> > >> > u...@flink.apache.org
> > >> > > >
> > >> > > > > > > > > >> Subject: Re: [ANNOUNCE] Apache Paimon is graduated
> to
> > >> Top
> > >> > > > Level
> > >> > > > > > > > Project
> > >> > > > > > > > > >>
> > >> > > > > > > > > >> CC the Flink user and dev mailing list.
> > >> > > > > > > > > >>
> > >> > > > > > > > > >> Paimon originated within the Flink community,
> > initially
> > >> > > known
> > >> > > > as
> > >> > > > > > Flink
> > >> > > > > > > > > >> Table Store, and all our incubating mentors are
> > >> members of
> > >> > > the
> > >> > > > > > Flink
> > >> > > > > > > > > >> Project Management Committee. I am confident that
> the
> > >> > bonds
> > >> > > of
> > >> > > > > > > > > >> enduring friendship and close collaboration will
> > >> continue
> > >> > to
> > >> > > > > > unite the
> > >> > > > > > > > > >> two communities.
> > >> > > > > > > > > >>
> > >> > > > > > > > > >> And congratulations all!
> > >> > > > > > > > > >>
> > >> > > > > > > > > >> Best Regards,
> > >> > > > > > > > > >> Yu
> > >> > > > > > > > > >>
> > >> > > > > > > > > >> On Wed, 27 Mar 2024 at 20:35, Guojun Li <
> > >> > > > > gjli.schna...@gmail.com>
> > >> > > > > > > > > wrote:
> > >> > > > > > > > > >> >
> > >> > > > > > > > > >> > Congratulations!
> > >> > > > > > > > > >> >
> > >> > > > > > > > > >> > Best,
> > >> > > > > > > > > >> > Guojun
> > >> > > > > > > > > >> >
> > >> > > > > > > > > >> > On Wed, Mar 27, 2024 at 5:24 PM wulin <
> > >> > > ouyangwu...@163.com>
> > >> > > > > > wrote:
> > >> > > > > > > > > >> >
> > >> > > > > > > 

RE: Re: Iceberg table maintenance

2024-04-01 Thread wenjin
Hi Peter,
Thanks a lot for your answers, this detailed explanation has cleared my 
confusions and been greatly beneficial to me. If you don't mind, could I 
discuss two more questions with your?

As you mentioned in your proposal and answers, the maintenance tasks may cause 
resource interference and delay the checkpoint, and to my opinion, they may 
also backpressure upstream when exists performance question. So, If it is a 
better way to recommend users to run maintenance tasks in a sperate Flink job?

As you mentioned in your proposal: "We can serialize different maintenance 
tasks by chaining them together, but maintenance tasks overlapping from 
consecutive runs also need to be prevented.” 
In my understanding, if maintenance tasks are chained together in one vertex, 
just like 
"scheduler1->task1->scheduler2->task2->scheduler3->task3->schduler4->task4",they
 will be executed serially,and only after task4 finished, scheduler1 will 
process next record. How can the overlapping of maintenance tasks happen?
On the other hand, ensure maintenance tasks do not run concurrently by chaing 
them together is not guaranteed, for there may be case diable the chain. In 
this case, I think using tags is a better way than lock mechanisms, for 
simplicity and ease of use for user.

Thanks,
Wenjin.

On 2024/03/30 13:22:12 Péter Váry wrote:
> Hi Wenjin,
> 
> See my answers below:
> 
> On Sat, Mar 30, 2024, 10:54 wenjin  wrote:
> 
> > Hi Peter,
> >
> > I am interested in your proposal and think make iceberg Flink Connector
> > support running maintenance task is meaningful . If possible, could you
> > help me clarify a few confusions.
> >
> > - When the iceberg table is written by single Flink job (use case1, 2),the
> > maintenance tasks will be added to the post commit topology. How dose the
> > maintenance tasks execute? Synchronously or Asynchronously? Will the
> > maintenance tasks block the data processing of Flink job?
> >
> 
> The sceduling and maintenance tasks are just regular Flink operators. Also
> the scheduler will make sure that the maintenance tasks are not chained to
> the Iceberg committer, so I would call this Asynchronous.
> Flink operators do not block other operators, but the maintenance tasks are
> competing for resources with the other data processing tasks. That is why
> we provide the possibility to define slot sharing groups for the
> maintenance tasks. This allows the users to separate the provided resources
> as much as Flink allows.
> 
> I have seen only one exception to this separation where we emit high number
> of records in the maintenance flow, which would cause delays in starting
> the checpoints, but it could be mitigated by enabling unaligned
> checkpoints, and using AsyncIO. There is one issue with AsynIO found by
> Gyula Fora: https://issues.apache.org/jira/browse/FLINK-34704 which means,
> even with AsyncIO the checkpoint could be blocked until at least one
> compaction group is finished.
> 
> - When the iceberg table is written by multi Flink jobs (use case 3), user
> > need to create a separate Flink job to run the maintenance task. In this
> > case, if user do not create a single job, but enable run maintenance task
> > in exist Flink jobs just like use case 1, what would be the consequences?
> > Or, is there an automatic mechanism to avoid this issue?
> >
> 
> The user needs to create a new job, or chose a single job to run the
> maintenance tasks to avoid running concurrent instances of the compaction
> tasks.
> Even if concurrent compaction tasks could be handled, they would be a
> serious waste of resources and increase the likelihood of failing tasks due
> to concurrent changes on the table. So we do not plan to support this ATM.
> 
> About the difference of the 2 scheduling method:
> - In case 1-2, the scheduling information is coming from the Iceberg
> committer - this is working for a single writer.
> - In case 3, the scheduling information is coming from the monitor - this
> is working for any numbers of writers.
> 
> So even if the maintenance tasks are run in one of the jobs, when there are
> multiple writers, the scheduling should be based on monitoring the changes
> on the table, instead of the information coming from the committer (which
> could only contain the changes only from a single writer)
> 
> I hope this helps,
> Peter
> 
> 
> > Thank you.
> >
> > Best,
> > Wenjin
> >
> > On 2024/03/28 17:59:49 Péter Váry wrote:
> > > Hi Team,
> > >
> > > I am working on adding a possibility to the Flink Iceberg connector to
> > run
> > > maintenance tasks on the Iceberg tables. This will fix the small files
> > > issues and in the long run help compacting the high number of positional
> > > and equality deletes created by Flink tasks writing CDC data to Iceberg
> > > tables without the need of Spark in the infrastructure.
> > >
> > > I did some planning, prototyping and currently trying out the solution
> > on a
> > > larger scale.
> > >
> > > I put together a document h

[jira] [Created] (FLINK-34985) It doesn't support to access fields by name for map function in thread mode

2024-04-01 Thread Dian Fu (Jira)
Dian Fu created FLINK-34985:
---

 Summary: It doesn't support to access fields by name for map 
function in thread mode
 Key: FLINK-34985
 URL: https://issues.apache.org/jira/browse/FLINK-34985
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Reporter: Dian Fu


Reported in slack channel: 
[https://apache-flink.slack.com/archives/C065944F9M2/p1711640068929589]

```
hi all, I seem to be running into an issue when switching to thread mode in 
PyFlink. In an UDF the {{Row}} seems to get converted into a tuple and you 
cannot access fields by their name anymore. In process mode it works fine. This 
bug can easily be reproduced using this minimal example, which is close to the 
PyFlink docs:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import Row
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
t_env.get_config().set("parallelism.default", "1")


# This does work:
t_env.get_config().set("python.execution-mode", "process")

# This doesn't work:
#t_env.get_config().set("python.execution-mode", "thread")


def map_function(a: Row) -> Row:
return Row(a.a + 1, a.b * a.b)


# map operation with a python general scalar function
func = udf(
map_function,
result_type=DataTypes.ROW(
[
DataTypes.FIELD("a", DataTypes.BIGINT()),
DataTypes.FIELD("b", DataTypes.BIGINT()),
]
),
)
table = (
t_env.from_elements(
[(2, 4), (0, 0)],
schema=DataTypes.ROW(
[
DataTypes.FIELD("a", DataTypes.BIGINT()),
DataTypes.FIELD("b", DataTypes.BIGINT()),
]
),
)
.map(func)
.alias("a", "b")
.execute()
.print()
)```
 
The exception I get in this execution mode is:
2024-03-28 16:32:10 Caused by: pemja.core.PythonException: : 'tuple' object has no attribute 'a'
2024-03-28 16:32:10 at 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/embedded/operation_utils.process_element(operation_utils.py:72)
2024-03-28 16:32:10 at 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/table/operations.process_element(operations.py:102)
2024-03-28 16:32:10 at .(:1)
2024-03-28 16:32:10 at 
/opt/flink/wouter/minimal_example.map_function(minimal_example.py:19)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-435: Introduce a New Dynamic Table for Simplifying Data Pipelines

2024-04-01 Thread Ron liu
Hi, Venkata krishnan

Thank you for your involvement and suggestions, and hope that the design
goals of this FLIP will be helpful to your business.

>>> 1. In the proposed FLIP, given the example for the dynamic table, do the
data sources always come from a single lake storage such as Paimon or does
the same proposal solve for 2 disparate storage systems like Kafka and
Iceberg where Kafka events are ETLed to Iceberg similar to Paimon?
Basically the lambda architecture that is mentioned in the FLIP as well.
I'm wondering if it is possible to switch b/w sources based on the
execution mode, for eg: if it is backfill operation, switch to a data lake
storage system like Iceberg, otherwise an event streaming system like Kafka.

Dynamic table is a design abstraction at the framework level and is not
tied to the physical implementation of the connector. If a connector
supports a combination of Kafka and lake storage, this works fine.

>>> 2. What happens in the context of a bootstrap (batch) + nearline update
(streaming) case that are stateful applications? What I mean by that is,
will the state from the batch application be transferred to the nearline
application after the bootstrap execution is complete?

I think this is another orthogonal thing, something that FLIP-327 tries to
address, not directly related to Dynamic Table.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data

Best,
Ron

Venkatakrishnan Sowrirajan  于2024年3月30日周六 07:06写道:

> Ron and Lincoln,
>
> Great proposal and interesting discussion for adding support for dynamic
> tables within Flink.
>
> At LinkedIn, we are also trying to solve compute/storage convergence for
> similar problems discussed as part of this FLIP, specifically periodic
> backfill, bootstrap + nearline update use cases using single implementation
> of business logic (single script).
>
> Few clarifying questions:
>
> 1. In the proposed FLIP, given the example for the dynamic table, do the
> data sources always come from a single lake storage such as Paimon or does
> the same proposal solve for 2 disparate storage systems like Kafka and
> Iceberg where Kafka events are ETLed to Iceberg similar to Paimon?
> Basically the lambda architecture that is mentioned in the FLIP as well.
> I'm wondering if it is possible to switch b/w sources based on the
> execution mode, for eg: if it is backfill operation, switch to a data lake
> storage system like Iceberg, otherwise an event streaming system like
> Kafka.
> 2. What happens in the context of a bootstrap (batch) + nearline update
> (streaming) case that are stateful applications? What I mean by that is,
> will the state from the batch application be transferred to the nearline
> application after the bootstrap execution is complete?
>
> Regards
> Venkata krishnan
>
>
> On Mon, Mar 25, 2024 at 8:03 PM Ron liu  wrote:
>
> > Hi, Timo
> >
> > Thanks for your quick response, and your suggestion.
> >
> > Yes, this discussion has turned into confirming whether it's a special
> > table or a special MV.
> >
> > 1. The key problem with MVs is that they don't support modification, so I
> > prefer it to be a special table. Although the periodic refresh behavior
> is
> > more characteristic of an MV, since we are already a special table,
> > supporting periodic refresh behavior is quite natural, similar to
> Snowflake
> > dynamic tables.
> >
> > 2. Regarding the keyword UPDATING, since the current Regular Table is a
> > Dynamic Table, which implies support for updating through Continuous
> Query,
> > I think it is redundant to add the keyword UPDATING. In addition,
> UPDATING
> > can not reflect the Continuous Query part, can not express the purpose we
> > want to simplify the data pipeline through Dynamic Table + Continuous
> > Query.
> >
> > 3. From the perspective of the SQL standard definition, I can understand
> > your concerns about Derived Table, but is it possible to make a slight
> > adjustment to meet our needs? Additionally, as Lincoln mentioned, the
> > Google Looker platform has introduced Persistent Derived Table, and there
> > are precedents in the industry; could Derived Table be a candidate?
> >
> > Of course, look forward to your better suggestions.
> >
> > Best,
> > Ron
> >
> >
> >
> > Timo Walther  于2024年3月25日周一 18:49写道:
> >
> > > After thinking about this more, this discussion boils down to whether
> > > this is a special table or a special materialized view. In both cases,
> > > we would need to add a special keyword:
> > >
> > > Either
> > >
> > > CREATE UPDATING TABLE
> > >
> > > or
> > >
> > > CREATE UPDATING MATERIALIZED VIEW
> > >
> > > I still feel that the periodic refreshing behavior is closer to a MV.
> If
> > > we add a special keyword to MV, the optimizer would know that the data
> > > cannot be used for query optimizations.
> > >
> > > I will ask more people for their opinion.
> > >
> > > Regards,
> > > Timo

Re: [DISCUSS] Externalized Google Cloud Connectors

2024-04-01 Thread Leonard Xu
Hey, Claire

Thanks starting this discussion, all flink external connector repos are 
sub-projects of Apache Flink, including  
https://github.com/apache/flink-connector-aws.

Creating a flink external connector repo  named flink-connectors-gcp as 
sub-project of Apache Beam is not a good idea from my side. 

>   Currently, we have no Flink committers on our team. We are actively
>   involved in the Apache Beam community and have a number of ASF members on
>   the team.

Not having Flink committer should not be a strong reason in this case,  Flink 
community welcome contributors to contribute and maintain the connectors, as a 
contributor, through continuous connector development and maintenance work in 
the community, you will also have the opportunity to become a Committer.

Best,
Leonard


> 2024年2月14日 上午12:24,Claire McCarthy  写道:
> 
> Hi Devs!
> 
> I’d like to kick off a discussion on setting up a repo for a new fleet of
> Google Cloud connectors.
> 
> A bit of context:
> 
>   -
> 
>   We have a team of Google engineers who are looking to build/maintain
>   5-10 GCP connectors for Flink.
>   -
> 
>   We are wondering if it would make sense to host our connectors under the
>   ASF umbrella following a similar repo structure as AWS (
>   https://github.com/apache/flink-connector-aws). In our case:
>   apache/flink-connectors-gcp.
>   -
> 
>   Currently, we have no Flink committers on our team. We are actively
>   involved in the Apache Beam community and have a number of ASF members on
>   the team.
> 
> 
> We saw that one of the original motivations for externalizing connectors
> was to encourage more activity and contributions around connectors by
> easing the contribution overhead. We understand that the decision was
> ultimately made to host the externalized connector repos under the ASF
> organization. For the same reasons (release infra, quality assurance,
> integration with the community, etc.), we would like all GCP connectors to
> live under the ASF organization.
> 
> We want to ask the Flink community what you all think of this idea, and
> what would be the best way for us to go about contributing something like
> this. We are excited to contribute and want to learn and follow your
> practices.
> 
> A specific issue we know of is that our changes need approval from Flink
> committers. Do you have a suggestion for how best to go about a new
> contribution like ours from a team that does not have committers? Is it
> possible, for example, to partner with a committer (or a small cohort) for
> tight engagement? We also know about ASF voting and release process, but
> that doesn't seem to be as much of a potential hurdle.
> 
> Huge thanks in advance for sharing your thoughts!
> 
> 
> Claire



[jira] [Created] (FLINK-34986) Basic framework for async execution of state

2024-04-01 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-34986:
---

 Summary: Basic framework for async execution of state
 Key: FLINK-34986
 URL: https://issues.apache.org/jira/browse/FLINK-34986
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends, Runtime / Task
Reporter: Zakelly Lan
Assignee: Zakelly Lan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Re: Iceberg table maintenance

2024-04-01 Thread Péter Váry
Thanks Wenjin for your response.

See my answers below:

On Tue, Apr 2, 2024, 04:08 wenjin  wrote:

> Hi Peter,
> Thanks a lot for your answers, this detailed explanation has cleared my
> confusions and been greatly beneficial to me. If you don't mind, could I
> discuss two more questions with your?
>
> As you mentioned in your proposal and answers, the maintenance tasks may
> cause resource interference and delay the checkpoint, and to my opinion,
> they may also backpressure upstream when exists performance question. So,
> If it is a better way to recommend users to run maintenance tasks in a
> sperate Flink job?
>

For some users - small tables, manageable amounts of data - architectural
simplicity is more important, than resource usage. Also, in the long term,
I hope Autoscaling can help with in-place scaling for these jobs.

But I definitely agree, that bigger, more resource constrained jobs are
need to separate out compaction to another job.


> As you mentioned in your proposal: "We can serialize different maintenance
> tasks by chaining them together, but maintenance tasks overlapping from
> consecutive runs also need to be prevented.”
> In my understanding, if maintenance tasks are chained together in one
> vertex, just like
> "scheduler1->task1->scheduler2->task2->scheduler3->task3->schduler4->task4",they
> will be executed serially,and only after task4 finished, scheduler1 will
> process next record. How can the overlapping of maintenance tasks happen?
>

When I talk about chained tasks, they are not chained into a single vertex.

They are using the output of the previous task to start the next task, but
all of them has multiple operators (some of them are with different
parallelism), which prevents them to got into a single vertex.

So overlapping could happen if a new input triggers a parallel scheduling.

On the other hand, ensure maintenance tasks do not run concurrently by
> chaing them together is not guaranteed, for there may be case diable the
> chain. In this case, I think using tags is a better way than lock
> mechanisms, for simplicity and ease of use for user.
>
> Thanks,
> Wenjin.
>
> On 2024/03/30 13:22:12 Péter Váry wrote:
> > Hi Wenjin,
> >
> > See my answers below:
> >
> > On Sat, Mar 30, 2024, 10:54 wenjin  wrote:
> >
> > > Hi Peter,
> > >
> > > I am interested in your proposal and think make iceberg Flink Connector
> > > support running maintenance task is meaningful . If possible, could you
> > > help me clarify a few confusions.
> > >
> > > - When the iceberg table is written by single Flink job (use case1,
> 2),the
> > > maintenance tasks will be added to the post commit topology. How dose
> the
> > > maintenance tasks execute? Synchronously or Asynchronously? Will the
> > > maintenance tasks block the data processing of Flink job?
> > >
> >
> > The sceduling and maintenance tasks are just regular Flink operators.
> Also
> > the scheduler will make sure that the maintenance tasks are not chained
> to
> > the Iceberg committer, so I would call this Asynchronous.
> > Flink operators do not block other operators, but the maintenance tasks
> are
> > competing for resources with the other data processing tasks. That is why
> > we provide the possibility to define slot sharing groups for the
> > maintenance tasks. This allows the users to separate the provided
> resources
> > as much as Flink allows.
> >
> > I have seen only one exception to this separation where we emit high
> number
> > of records in the maintenance flow, which would cause delays in starting
> > the checpoints, but it could be mitigated by enabling unaligned
> > checkpoints, and using AsyncIO. There is one issue with AsynIO found by
> > Gyula Fora: https://issues.apache.org/jira/browse/FLINK-34704 which
> means,
> > even with AsyncIO the checkpoint could be blocked until at least one
> > compaction group is finished.
> >
> > - When the iceberg table is written by multi Flink jobs (use case 3),
> user
> > > need to create a separate Flink job to run the maintenance task. In
> this
> > > case, if user do not create a single job, but enable run maintenance
> task
> > > in exist Flink jobs just like use case 1, what would be the
> consequences?
> > > Or, is there an automatic mechanism to avoid this issue?
> > >
> >
> > The user needs to create a new job, or chose a single job to run the
> > maintenance tasks to avoid running concurrent instances of the compaction
> > tasks.
> > Even if concurrent compaction tasks could be handled, they would be a
> > serious waste of resources and increase the likelihood of failing tasks
> due
> > to concurrent changes on the table. So we do not plan to support this
> ATM.
> >
> > About the difference of the 2 scheduling method:
> > - In case 1-2, the scheduling information is coming from the Iceberg
> > committer - this is working for a single writer.
> > - In case 3, the scheduling information is coming from the monitor - this
> > is working for any numbers of writers.
> >
> > S