Re: [RESULT] [VOTE] Release 1.19.2, release candidate #1

2025-02-13 Thread Zach Zhang
Hi Alex

Has Flink 1.19.2 released? If not, when will be the target date?

Leonard Xu  于2025年2月12日周三 09:57写道:

> Thanks Alex for driving this release, Good Job!
>
> minor:XXX should be replaced by votes number, i.e. there are 7 approving
> votes, 4 of which are binding
>
> Best,
> Leonard
>
> > 2025年2月11日 23:29,Alexander Fedulov  写道:
> >
> > Hi all,
> >
> > I'm happy to announce that we have unanimously approved this release.
> >
> > There are XXX approving votes, XXX of which are binding:
> > * Maximilian Michels (binding)
> > * Marton Balassi (binding)
> > * Leonard Xu (binding)
> > * Robert Merzger (binding)
> > * Yanquan Lv (non-binding)
> > * Ferenc Csaky (non-binding)
> > * Sergey Nuyanzin (non-binding)
> >
> > There are no disapproving votes.
> >
> > Thanks everyone!
> >
> > [1] https://lists.apache.org/thread/9tqhyc160svt8q697gnn76djdxfd5hzg
> >
> > Best,
> > Alex
>
>


[jira] [Created] (FLINK-37322) surport eval groovy scripts udf for flink sql

2025-02-13 Thread tianyuan (Jira)
tianyuan created FLINK-37322:


 Summary: surport eval groovy scripts udf for flink sql
 Key: FLINK-37322
 URL: https://issues.apache.org/jira/browse/FLINK-37322
 Project: Flink
  Issue Type: New Feature
Reporter: tianyuan


I want to add a new UDF to dynamically execute Groovy scripts to achieve any 
desired functions.
Just like using Lua scripts in Redis. I have implemented this UDF locally, 
which has very efficient execution performance and strong flexibility. Usage is 
as follows:

select eval_groovy_script('INT', 'return arg0+arg1', 1, 2);

select eval_groovy_script('INT', 'return p1+p2', MAP['p1', 2, 'p2', 5]);

select eval_groovy_script('MAP', 'return ["TopicName" : "Lists", 
"Author" : "Raghav"] ');

This udf requires the following three parameters:

1. Data return type

2. Groovy script

3. Parameters passed to the groovy script. If it is a variable type, use the 
variable in the groovy script through arg\{index}. If it is a map type, you can 
use the variable through the key in the groovy script.

Apache pinot has implemented similar functions:
https://docs.pinot.apache.org/users/user-guide-query/scalar-functions

I hope a committer will recognize my idea and assign this task to me.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-491: BundledAggregateFunction for batched aggregation

2025-02-13 Thread Alan Sheinberg
Thanks for the comments Timo.

1. Consistency:

Could we rename the `canXX` methods to `supportsXX` methods?
> This might fit better to the existing methods such as
> `FunctionDefinition.supportsConstantFoloding()` and other ability
> interfaces that we use in connectors.

Sounds good.  I changed them to supports.

 Could we rename the table option `table.exec.agg-bundled` to
> `table.exec.bundled-agg`?  This fits better to existing options that
> also put the "agg" at the end e.g. `hash-agg` or `window-agg`.


That makes sense.  I changed it to table.exec.bundled-agg.


2. Making the interface explicit

It shouldn't be a problem to explicitly add the
> `bundledAccumulateRetract()` method to the `BundledAggregateFunction`.
> The signature is not dynamic like accumulare()/retract(). We can add a
> default implementation such as:

default List
> bundledAccumulateRetract(List batch)
>  throws Exception {
>throw new UnsupportOperationException("Method must be implemented
> when supportsBundling() is true.");
> }


 You're right.  Maybe I was being a bit fancier than is required.  I wanted
the ability for implementations which return false for supportsBundling to
not have to do anything, but that's easily solved with the default
implementation above.

Otherwise the FLIP seems to be in a good shape.

+1 for voting.


Great.  Will call a vote tomorrow.

Thanks again,
Alan

On Thu, Feb 13, 2025 at 8:46 AM Timo Walther  wrote:

> Thank you for the update Alan. I just have some last minor comments,
> mostly around naming:
>
> 1. Consistency:
>
> Could we rename the `canXX` methods to `supportsXX` methods?
> This might fit better to the existing methods such as
> `FunctionDefinition.supportsConstantFoloding()` and other ability
> interfaces that we use in connectors.
>
> Could we rename the table option `table.exec.agg-bundled` to
> `table.exec.bundled-agg`?  This fits better to existing options that
> also put the "agg" at the end e.g. `hash-agg` or `window-agg`.
>
> 2. Making the interface explicit
>
> It shouldn't be a problem to explicitly add the
> `bundledAccumulateRetract()` method to the `BundledAggregateFunction`.
> The signature is not dynamic like accumulare()/retract(). We can add a
> default implementation such as:
>
> default List
> bundledAccumulateRetract(List batch)
>  throws Exception {
>throw new UnsupportOperationException("Method must be implemented
> when supportsBundling() is true.");
> }
>
>
> Otherwise the FLIP seems to be in a good shape.
>
> +1 for voting.
>
> Regards,
> Timo
>
>
> On 07.02.25 22:02, Alan Sheinberg wrote:
> > Hi all,
> >
> > Is there any more feedback I can incorporate before calling a vote?
> >
> > Thanks,
> > Alan
> >
> > On Tue, Jan 28, 2025 at 1:50 PM Alan Sheinberg 
> > wrote:
> >
> >> Hi Fabian,
> >>
> >> I addressed your comments below.
> >>
> >>
> >>> * BundledKeySegement
> >>>* should it be accumulatorS instead of accumulator?
> >>>* should accumulators be null or an empty list if no accumulator is
> >>> present?
> >>
> >>
> >> Good catch, forgot to update those places.  It should be "accumulators"
> >> and an empty list when there are none. Updated in the FLIP.
> >>
> >>>
> >>> * update the Group By Example to use a list of accumulator instead of a
> >>> single value
> >>
> >>
> >> Updated.
> >>
> >> * fix the `AvgAggregate` example:
> >>>* add missing `canRetract()`, and `canMerge()` methods
> >>>* example uses `batch()` method instead of
> `bundledAccumulateRetract()`
> >>>* example works with a single accumulator, not a list of
> accumulators
> >>
> >>
> >> Updated AvgAggregate to address these things.  They should be consistent
> >> with the interface discussed.
> >>
> >> * in general, check the code examples for compliance with the proposed
> API.
> >>>* Some use `bundle()` instead of `bundledAccumulateRetract()`.
> >>>* There might be other mistakes that sneaked when evolving the API
> >>
> >>
> >> Yeah, you're right.  I'll read through it again and try to find any
> >> missing updates.
> >>
> >> Thanks,
> >> Alan
> >>
> >> On Tue, Jan 28, 2025 at 3:58 AM Fabian Hüske
> 
> >> wrote:
> >>
> >>> Thanks Alan for updating the FLIP!
> >>>
> >>> IMO, it looks good.
> >>>
> >>> Just a few nits that would be great to fix for consistency:
> >>>
> >>> * BundledKeySegement
> >>>* should it be accumulatorS instead of accumulator?
> >>>* should accumulators be null or an empty list if no accumulator is
> >>> present?
> >>> * update the Group By Example to use a list of accumulator instead of a
> >>> single value
> >>> * fix the `AvgAggregate` example:
> >>>* add missing `canRetract()`, and `canMerge()` methods
> >>>* example uses `batch()` method instead of
> `bundledAccumulateRetract()`
> >>>* example works with a single accumulator, not a list of
> accumulators
> >>> * in general, check the code examples for compliance with the proposed
> >>> API.
> >>>* Some use `bu

退订

2025-02-13 Thread cun8cun8
退订

Re: [DISCUSS] Elasticsearch v3.1 release

2025-02-13 Thread weijie guo
Hi Ahmed,

I recently working for adopting flink 2.0 for ES Connector. Before that, I
need to release a version that supports 1.20. But I saw that you has
already kicked off this Elasticsearch v3.1 release, so I was wondering
where this is going?
I can help you do the stuffs that requires committer access if you want.

Best regards,

Weijie


Ahmed Hamdy  于2024年12月3日周二 22:03写道:

> Hi Yanquan,
> I have kicked off the release process for v3.1, will try to conclude soon
> and announce.
> Sorry for the delays, it is quite hard to process the release without PMC
> permissions but Danny is kindly offering help when needed.
>
> Best Regards
> Ahmed Hamdy
>
>
> On Tue, 3 Dec 2024 at 12:05, Yanquan Lv  wrote:
>
> > Hi Ahmed,
> > I would like to kindly inquire about the current plan for release v3.1,
> > are there any other conclusions from offline communication?
> >
> > > 2024年10月14日 17:21,Ahmed Hamdy  写道:
> > >
> > > Hi all,
> > > I want to kick off the discussion for elasticsearch connector release
> > v3.1.
> > > Our latest release v3.0.1 was more than a year ago[1]. Since then the
> > > connector has developed an important set of features like SinkV2
> > > compatibility update[2] and most importantly Elasticsearch 8 support[3]
> > > which is of high demand. I believe it might be time to release V3.1.
> > > I am happy to drive the release with an assist from a PMC for
> > > privileged actions or even assist if a PMC is willing to lead.
> > >
> > > Let me know your thoughts.
> > >
> > > 1-https://lists.apache.org/thread/374078blmqgfvtt41pbbzr2df37k2nc0
> > > 2-https://issues.apache.org/jira/browse/FLINK-34113
> > > 3-https://issues.apache.org/jira/browse/FLINK-26088
> > >
> > >
> > > Best Regards
> > > Ahmed Hamdy
> >
> >
>


Re: Kafka connector releases

2025-02-13 Thread Yanquan Lv
Hi, Devs.

I am now working on the Kafka connector bump to Flink 2.0 in 4.0.0 version, and 
I have already opened a PR for this. Since there is a bug that fix in 
[FLINK-36568](Flink 2.0) in Flink 2.0-preview1 that would cause fail to run e2e 
case, so I comment out these e2e tests to pass CI. 
However, considering the reliability of the release, the community is concerned 
about the lack of e2e testing, So the work on bump 2.0-preview1 has not been 
completed yet.

At present, the official version of Flink 2.0 is also about to be released. 
would it be better for us to directly bump to this official version before 
continuing with the release of flink-kafka-connector 4.0.0?
Thanks for any feedback from the community regarding the release of version 
4.0.0.


[1] https://github.com/apache/flink-connector-kafka/pull/140
[2] https://issues.apache.org/jira/browse/FLINK-36568



> 2024年11月15日 04:09,Arvid Heise  写道:
> 
> Hi all,
> 
> I just sent out the first release candidate for 3.4.0. The only difference
> to 3.3.0 is that we cease to support 1.19 in favor of supporting the open
> lineage interfaces [1] that have been added to Flink 1.20.
> 
> With that release under way (please verify and vote!), the main branch is
> now good to go for the Flink 2.0-preview release. @Qingsheng Ren
>  and Leonard are driving that release, Yanquan already
> volunteered to help. I'm pretty sure more help is always welcome. Maybe it
> even makes sense to organize in Slack for that release.
> 
> Best of luck and don't hesitate to reach out to me if you need another
> opinion.
> 
> Arvid
> 
> [1] https://issues.apache.org/jira/browse/FLINK-34466
> 
> On Wed, Nov 6, 2024 at 2:16 PM Leonard Xu  wrote:
> 
>> Thanks @Yanquan for track this thread, and thanks @Arvid for the
>> information, it makes sense to me
>> 
>> Qingsheng will drive the release of flink-2.0-preview of Flink Kafka
>> connector, and I’d like assist it too.
>> 
>> Best,
>> Leonard
>> 
>> 
>>> 2024年11月6日 下午6:58,Arvid Heise  写道:
>>> 
>>> Hi Yanquan,
>>> 
>>> the current state of the 3.4.0 release is that it's still pending on the
>>> lineage PR [1] which I expect to be merged next week (the author is on
>>> vacation). The release cut would then happen right afterwards.
>>> 
>>> After the release cut, we can then bump to 4.0.0-SNAPSHOT and Flink
>>> 2.0-preview. @Qingsheng Ren  and Leonard wanted to
>>> drive that release. I already prepared a bit by thoroughly annotating
>>> everything with Deprecated but the whole test side needs a bigger
>> cleanup.
>>> It's probably also a good time to bump other dependencies.
>>> Could you please sync with the two release managers? At least Qingsheng
>> is
>>> responsive in the Flink slack - I talked to him quite a bit there.
>>> 
>>> If there is a pressing need to start earlier, we can also cut the 3.4
>>> branch (which is then effectively the 3.3 branch) earlier and backport
>> the
>>> lineage PR (it's just one commit ultimately). I'd leave that decision to
>>> the two release managers for 4.0.0 mentioned before.
>>> 
>>> One thing to note for 4.0.0 is that we need to solve the transaction
>>> management issues with the KafkaSink [2]. It's blocking larger users from
>>> adopting the KafkaSink which will be the only option for Flink 2.0. I
>> have
>>> started designing a solution.
>>> 
>>> Best,
>>> 
>>> Arvid
>>> 
>>> 
>>> [1] https://github.com/apache/flink-connector-kafka/pull/130
>>> [2] https://issues.apache.org/jira/browse/FLINK-34554
>>> 
>>> On Tue, Nov 5, 2024 at 4:48 AM Yanquan Lv  wrote:
>>> 
 Hi, Arvid.
 
 It has been a month and we are glad to see that we have completed the
 release of Kafka 3.3.0 targeting 1.19 and 1.20.
 
 Considering that Flink 2.0-preview1 has already been released, I would
 like to know about our plans and progress for bumping to 2.0-preview1.
 I tested the changes required for bump to 2.0-preview1 locally and found
 that the adaptation changes made in the production code based on
 FlinkKafkaProducer Depreciated work were relatively clear and the
>> amount of
 change was not significant. However, the headache was that there were
>> many
 adjustments needed in the test code.
 
 I would like to know if there is already work in the community to bump
>> to
 2.0-preview1. If not, I can help complete this task (but some
>> suggestions
 may be needed for testing the adaptation in the code).
 
 
 
 
 
> 2024年9月27日 16:23,Arvid Heise  写道:
> 
> Dear Flink devs,
> 
> I'd like to initiate three(!) Kafka connector releases. The main reason
 for
> having three releases is that we have been slacking a bit in keeping up
> with the latest changes.
> 
> Here is the summary:
> 1. Release kafka-3.3.0 targeting 1.19 and 1.20 (asap)
> - Incorporates lots of deprecations for Flink 2 including everything
>> that
> is related to FlinkKafkaProducer (SinkFunction, FlinkKafkaConsumer
> 

Re: Kafka connector releases

2025-02-13 Thread Arvid Heise
Hi Yanquan,

thank you very much for taking care of this.

I propose to merge your PR without E2E into 4.0-SNAPSHOT. Then we wait
for Flink 2.0.0 with the fix, reenable E2E and lastly we can cut an
official 4.0.0 release (we need to wait for 2.0.0 anyways for the
release).
We could release a 4.0.0-preview if needed (but 4.0-SNAPSHOT could be
good enough).

In this way, we can merge your PR and you don't need to rebase further
while also not sacrificing any reliability.

WDYT?

Best,

Arvid

On Thu, Feb 13, 2025 at 1:45 PM Yanquan Lv  wrote:
>
> Hi, Devs.
>
> I am now working on the Kafka connector bump to Flink 2.0 in 4.0.0 version, 
> and I have already opened a PR for this. Since there is a bug that fix in 
> [FLINK-36568](Flink 2.0) in Flink 2.0-preview1 that would cause fail to run 
> e2e case, so I comment out these e2e tests to pass CI.
> However, considering the reliability of the release, the community is 
> concerned about the lack of e2e testing, So the work on bump 2.0-preview1 has 
> not been completed yet.
>
> At present, the official version of Flink 2.0 is also about to be released. 
> would it be better for us to directly bump to this official version before 
> continuing with the release of flink-kafka-connector 4.0.0?
> Thanks for any feedback from the community regarding the release of version 
> 4.0.0.
>
>
> [1] https://github.com/apache/flink-connector-kafka/pull/140
> [2] https://issues.apache.org/jira/browse/FLINK-36568
>
>
>
> > 2024年11月15日 04:09,Arvid Heise  写道:
> >
> > Hi all,
> >
> > I just sent out the first release candidate for 3.4.0. The only difference
> > to 3.3.0 is that we cease to support 1.19 in favor of supporting the open
> > lineage interfaces [1] that have been added to Flink 1.20.
> >
> > With that release under way (please verify and vote!), the main branch is
> > now good to go for the Flink 2.0-preview release. @Qingsheng Ren
> >  and Leonard are driving that release, Yanquan already
> > volunteered to help. I'm pretty sure more help is always welcome. Maybe it
> > even makes sense to organize in Slack for that release.
> >
> > Best of luck and don't hesitate to reach out to me if you need another
> > opinion.
> >
> > Arvid
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-34466
> >
> > On Wed, Nov 6, 2024 at 2:16 PM Leonard Xu  wrote:
> >
> >> Thanks @Yanquan for track this thread, and thanks @Arvid for the
> >> information, it makes sense to me
> >>
> >> Qingsheng will drive the release of flink-2.0-preview of Flink Kafka
> >> connector, and I’d like assist it too.
> >>
> >> Best,
> >> Leonard
> >>
> >>
> >>> 2024年11月6日 下午6:58,Arvid Heise  写道:
> >>>
> >>> Hi Yanquan,
> >>>
> >>> the current state of the 3.4.0 release is that it's still pending on the
> >>> lineage PR [1] which I expect to be merged next week (the author is on
> >>> vacation). The release cut would then happen right afterwards.
> >>>
> >>> After the release cut, we can then bump to 4.0.0-SNAPSHOT and Flink
> >>> 2.0-preview. @Qingsheng Ren  and Leonard wanted to
> >>> drive that release. I already prepared a bit by thoroughly annotating
> >>> everything with Deprecated but the whole test side needs a bigger
> >> cleanup.
> >>> It's probably also a good time to bump other dependencies.
> >>> Could you please sync with the two release managers? At least Qingsheng
> >> is
> >>> responsive in the Flink slack - I talked to him quite a bit there.
> >>>
> >>> If there is a pressing need to start earlier, we can also cut the 3.4
> >>> branch (which is then effectively the 3.3 branch) earlier and backport
> >> the
> >>> lineage PR (it's just one commit ultimately). I'd leave that decision to
> >>> the two release managers for 4.0.0 mentioned before.
> >>>
> >>> One thing to note for 4.0.0 is that we need to solve the transaction
> >>> management issues with the KafkaSink [2]. It's blocking larger users from
> >>> adopting the KafkaSink which will be the only option for Flink 2.0. I
> >> have
> >>> started designing a solution.
> >>>
> >>> Best,
> >>>
> >>> Arvid
> >>>
> >>>
> >>> [1] https://github.com/apache/flink-connector-kafka/pull/130
> >>> [2] https://issues.apache.org/jira/browse/FLINK-34554
> >>>
> >>> On Tue, Nov 5, 2024 at 4:48 AM Yanquan Lv  wrote:
> >>>
>  Hi, Arvid.
> 
>  It has been a month and we are glad to see that we have completed the
>  release of Kafka 3.3.0 targeting 1.19 and 1.20.
> 
>  Considering that Flink 2.0-preview1 has already been released, I would
>  like to know about our plans and progress for bumping to 2.0-preview1.
>  I tested the changes required for bump to 2.0-preview1 locally and found
>  that the adaptation changes made in the production code based on
>  FlinkKafkaProducer Depreciated work were relatively clear and the
> >> amount of
>  change was not significant. However, the headache was that there were
> >> many
>  adjustments needed in the test code.
> 
>  I would like to know if there is already 

[jira] [Created] (FLINK-37317) Checkpoint asyncOperationsThreadPool has only one thread running

2025-02-13 Thread Jufang He (Jira)
Jufang He created FLINK-37317:
-

 Summary: Checkpoint asyncOperationsThreadPool has only one thread 
running
 Key: FLINK-37317
 URL: https://issues.apache.org/jira/browse/FLINK-37317
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.20.1
Reporter: Jufang He
 Attachments: image-2025-02-13-17-23-11-788.png, 
image-2025-02-13-17-25-47-904.png, image-2025-02-13-17-26-04-449.png

org.apache.flink.streaming.runtime.tasks.StreamTask#AsyncOperationsThreadPool 
configuration looks unreasonable, cause in fact will only have one thread to 
run. The concurrent checkpoints asyncCheckpointRunnable execution and close can 
only run serially. The thread pool configuration is as follows: 
!image-2025-02-13-17-25-01-252.png!

I simulated the execution of the thread pool locally, and the results were as 
follows:

!image-2025-02-13-17-25-47-904.png!

!image-2025-02-13-17-26-04-449.png!
I think there are the following ways to solve this problem:

1、Increases the corePoolSize, but increases CPU consumption.

2、Use CachedThreadPool which uses SynchronousQueue as the workQueue, and the 
tasks execute immediately. But too many tasks can take up too many resources.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-508: Add support for Smile format for Compiled plans

2025-02-13 Thread Fabian Hüske
+1 (binding)

Cheers, Fabian

On Thu, Feb 13, 2025 at 12:07 PM David Radley 
wrote:

> +1 (non-binding)
>
> Kind regards, David.
>
> From: Sergey Nuyanzin 
> Date: Thursday, 13 February 2025 at 10:52
> To: dev@flink.apache.org 
> Subject: [EXTERNAL] [VOTE] FLIP-508: Add support for Smile format for
> Compiled plans
> Hi everyone,
>
> I'd like to start a vote on FLIP-508: Add support for Smile format for
> Compiled plans [1]
> which has been discussed in this thread [2].
>
> The vote will be open for at least 72 hours unless there is an objection
> or not enough votes.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-508%3A+Add+support+for+Smile+format+for+Compiled+plans
> [2] https://lists.apache.org/thread/jtcw002hnty29dnq9gykxkzy8gdkmb8j
>
> --
> Best regards,
> Sergey
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: Building C, IBM Hursley Office, Hursley Park Road,
> Winchester, Hampshire SO21 2JN
>


Migrating CI to Github Actions

2025-02-13 Thread Tom Cooper
Hi all,

I was hoping to sync up on the progress on moving from Azure CI to GitHub 
Actions for the main Flink repository. There was FLIP-396 [1] by Matthias Pohl 
detailing the plan to trial GitHub Actions and FLINK-27075 [2] which tracks the 
work. There are several workflows in the repo already and actions do seem to be 
running [3]. The FLIP mentions the 1.19 release as the watershed for deciding 
if we move off Azure CI. As we are now nearly at 2.0, I wondered if there has 
been anymore discussion on this?

My main motivation for asking is that, as part of Community Health Initiative 
(CHI) Workgroup [4], we have been looking at how to further speed up/simplify 
PR reviews. One of the main issues with reviews is that the PR has failed to 
pass the CI tests and in many cases that will be something simple like a 
failure of the checkstyle/spotless checks. However, to find that out you need 
to click through several layers of Azure CI UI and parse the test logs. 

It would be useful if we could run these standard linting checks for every PR 
before the main CI is run and make that clearly visible to the submitter on the 
PR via the GH CI UI integration (green ticks or red crosses with a clear 
reason). GitHub Actions seems like a perfect fit for this and indeed we already 
have a workflow for pre-compile checks [5] that would perform this. However, 
that workflow does not run on pull requests. 

So I was wondering:

1) Has there been a discussion on moving forward with the move to GH Actions?
2) If the process has stalled due to a lack of developer time, then the CHI 
members are willing to help but we may need context/help from those previously 
involved.
3) As a minimum, would we be able to enable the pre-compile checks for all PRs?

Thanks,

Tom Cooper
@tomcooper.dev | https://tomcooper.dev

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-396%3A+Trial+to+test+GitHub+Actions+as+an+alternative+for+Flink%27s+current+Azure+CI+infrastructure
[2] https://issues.apache.org/jira/browse/FLINK-27075
[3] https://github.com/apache/flink/actions
[4] 
https://cwiki.apache.org/confluence/display/FLINK/Community+Health+Initiative+%28CHI%29+workgroup
[5] 
https://github.com/apache/flink/blob/master/.github/workflows/template.pre-compile-checks.yml


2025/02/13 CHI Eastern time zone workgroup meeting minutes

2025-02-13 Thread David Radley
Hi,

We have the 11th 30 minute working meeting of the Community Health Initiative 
(CHI) for Eastern time zone. In 
addition to triaging PRs we discussed:

  *   15 PRs came in this week.
  *   We talked of moving the Flink / CI bot type activity to a Git action. 
Robert Metzger said 
there was some work in this area that had not yet completed. Thomas 
Cooper  and I offered 
to help complete this
  *   We talked of creating a list of things that the Flink bot should do, I 
have created a starter list, please let us know if there are other ideas you 
want to add.
  *   We talked about amending the Flink operator release process to include 
creating OLM Operatorhub versions. There are instructions in IBM that we are 
looking to open source and share to automate. We will track this in this 
meeting.

Kind regards, David.

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: Building C, IBM Hursley Office, Hursley Park Road, 
Winchester, Hampshire SO21 2JN


Re: [DISCUSS] Pluggable Batching for Async Sink in Flink

2025-02-13 Thread Poorvank Bhatia
Hello Ahmed,
Thank you for your insights! In fact, FLIP-284
's
proposal aligns so well with this approach, I have added it has the future
scope.
To answer your questions:

I assume we will not expose batch creators to users but only to sink
implementers, am I correct?

   - *Yes, that is correct. The BatchCreator interface is intended to be
   used only by sink implementers, not end users. Since the SinkWriter is
   responsible for defining how records are batched and written, only those
   developing a custom sink would need access to this functionality.*

It is nit: but I would prefer adding the creator to the
`AsyncSinkWriterConfiguration` rather than overloading the constructor,
there are some already implemented ways for instantiating the configs with
defaults so would be simpler.


   - *Yes, BatchCreator can be included in AsyncSinkWriterConfiguration.
   However, my initial reasoning for keeping it separate was that BatchCreator
   is not just a configuration parameter—it's a functional component that
   directly influences how records are batched. This makes it more of a
   behavioral dependency rather than a passive setting. That said, if
   incorporating it into AsyncSinkWriterConfiguration aligns better with the
   overall design, it is a nit change and I'm open to updating the approach
   accordingly. Do let me know your thoughts, as RateLimitingStartegy is also
   a part of it.*


 I want to leverage some concept from FLIP-284 that we have previously
seen in DynamoDbAsyncSink, do you think we can also customize the trigger
so we can also add a "readyToFlush()" method or something similar and use
it in `nonBlockingFlush` methods instead of forcing the trigger on only
total record count or buffer size. I see this useful in your case as well
because if we have buffered 100 requests from 100 different partitions then
we will keep triggering and emitting batches of 1 for 100 times (unless
maxTimeInBufferMS is used ofc) so you can use the custom trigger for
minimum buffered records per partition. This might be a bit tricky because
in order to not affect the performance we probably will wrap the whole
buffer in the batchCreator to maintain calculations like size in bytes or
per partition record count in the batch creator making it more
"bufferHandler" rather than a "batchCreator", let me know your thoughts
about this


   -

   *You bring up a valid concern about optimizing flush triggers to prevent
   inefficient batch emissions when multiple partitions exist, particularly in
   sinks like DynamoDB and Cassandra. IMO flushing (when to send records) and
   batching (how to group records) should remain separate concerns, as they
   serve distinct purposes.*

   *To structure this properly, I’ll use FLIP-284 as the foundation:*
   *🔹 BufferFlushTrigger (From FLIP-284) - Decides When to Flush*

   *The BufferFlushTrigger should be responsible for determining when the
   buffer is ready to flush, based on configurable conditions such as:*
   - *Total record count (batch size threshold).*
  - *Buffer size in bytes.*
  - *Time-based constraints (maxTimeInBufferMS), or any other custom
  logic a sink may require.*

   *🔹 BatchCreator - Decides How to Form a Batch*

   *Once BufferFlushTrigger determines that a flush should occur,
   BatchCreator is responsible for reading the buffered requests and forming a
   batch based on partitioning rules.*
   - *BatchCreator does not decide when to flush—it only handles how
  records are grouped when a flush is triggered.*
  - *This separation allows partition-aware batching, ensuring that
  records from the same partition are grouped together before submission.*
   --
   *🔹 Example: BatchCreator with Two Partitioning Keys**Scenario*
  - *batchSize = 20*
  - *The buffer contains records from two partitions.*
  - *Assume either (a) we flush when at least 10 records exist per
  partition.*
  - *We apply a simple flush strategy that triggers a flush when 20
  total records are buffered.*
   *How BatchCreator Forms the Batch*

   *Even though BufferFlushTrigger initiates the flush when 20 records are
   reached, BatchCreator must still decide how to structure the batch. It does
   so by:*
   1. *Reading the buffered records.*
  2. *Grouping them by partition key. (Find out the 10 batch)*
  3. *Creating a valid batch from these groups before submitting them
  downstream.*
   *What I mean is that, the buffered Requests should be there for both
   flushing and batching for optimized writes. *
   --
   *The key takeaway is that by keeping these two interfaces separate, Sink
   Writers gain flexibility—they can mix and match different batching and
   flushing strategies. Since there is no tight coupling between them,
   different sinks can:*
  - *

[jira] [Created] (FLINK-37318) RocksDBKeyedStateBackend disposed before task exit

2025-02-13 Thread Feifan Wang (Jira)
Feifan Wang created FLINK-37318:
---

 Summary: RocksDBKeyedStateBackend disposed before task exit
 Key: FLINK-37318
 URL: https://issues.apache.org/jira/browse/FLINK-37318
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.12.2
Reporter: Feifan Wang


We encountered two cases in the same job where RocksDBStateBackend was disposed 
before task cancel, as shown in the following logs:
{code:java}
2024-12-01 12:12:12,703 INFO  
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Closed 
RocksDB State Backend. Cleaning up RocksDB working directory 
/data4/hadoop/yarn/nm-local-dir/usercache/hadoop-rt/appcache/application_1725359226161_2973711/flink-io-a9a5e6ba-f582-40e0-a6f1-f4221911de28/job_34a8565e51c2c2e22bae59f537e5775f_op_KeyedProcessOperator_174cfe0875f20e2c391c93234c3d8810__65_80__uuid_01262a91-5f77-4f57-9cc1-edf586a5d799.
2024-12-01 12:12:21,866 WARN  org.apache.flink.runtime.taskmanager.Task         
            - OverAggregate(partitionBy=[dpid, $18], orderBy=[stat_time ASC], 
window=[ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], sele...id, imei, 
idfa, mac_address, client_ip, device_model, os_version, os, extension, 
stat_time, w0$o0 AS rnk, (w0$o0 - 1) AS $f19])) (65/80)#0 
(3e1d0680c28b9bc1e58fc5eaf341e6a9) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkRuntimeException: Error while deserializing the user 
key.
        at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:486)
        at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$1.next(RocksDBMapState.java:197)
        at 
org.apache.flink.table.runtime.operators.over.AbstractRowTimeUnboundedPrecedingOver.onTimer(AbstractRowTimeUnboundedPrecedingOver.java:203)
        at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
        at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
        at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
        at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:219)
        at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
        at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:400)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:629)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:767)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:578)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
        at 
org.apache.flink.core.memory.DataInputDeserializer.readLong(DataInputDeserializer.java:233)
        at 
org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:72)
        at 
org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:30)
        at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:380)
        at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.access$000(RocksDBMapState.java:64)
        at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:483)
        ... 20 more
2024-12-01 12:12:21,868 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Freeing task resources for OverAggregate(partitionBy=[dpid, $18], 
orderBy=[stat_time ASC], window=[ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW], sele...id, imei, idfa, mac_address, client_ip, device_model, os_version, 
os, extension, stat_time, w0$o0 AS 

Re: [VOTE] FLIP-508: Add support for Smile format for Compiled plans

2025-02-13 Thread Timo Walther

+1 (binding)

Thanks,
Timo


On 13.02.25 11:51, Sergey Nuyanzin wrote:

Hi everyone,

I'd like to start a vote on FLIP-508: Add support for Smile format for
Compiled plans [1]
which has been discussed in this thread [2].

The vote will be open for at least 72 hours unless there is an objection
or not enough votes.

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-508%3A+Add+support+for+Smile+format+for+Compiled+plans
[2] https://lists.apache.org/thread/jtcw002hnty29dnq9gykxkzy8gdkmb8j





Re: [VOTE] FLIP-508: Add support for Smile format for Compiled plans

2025-02-13 Thread David Radley
+1 (non-binding)

Kind regards, David.

From: Sergey Nuyanzin 
Date: Thursday, 13 February 2025 at 10:52
To: dev@flink.apache.org 
Subject: [EXTERNAL] [VOTE] FLIP-508: Add support for Smile format for Compiled 
plans
Hi everyone,

I'd like to start a vote on FLIP-508: Add support for Smile format for
Compiled plans [1]
which has been discussed in this thread [2].

The vote will be open for at least 72 hours unless there is an objection
or not enough votes.

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-508%3A+Add+support+for+Smile+format+for+Compiled+plans
[2] https://lists.apache.org/thread/jtcw002hnty29dnq9gykxkzy8gdkmb8j

--
Best regards,
Sergey

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: Building C, IBM Hursley Office, Hursley Park Road, 
Winchester, Hampshire SO21 2JN


[jira] [Created] (FLINK-37316) Add StateManager#getStateOptional and StateManager#getState methods for DataStream V2

2025-02-13 Thread xuhuang (Jira)
xuhuang created FLINK-37316:
---

 Summary: Add StateManager#getStateOptional and 
StateManager#getState methods for DataStream V2
 Key: FLINK-37316
 URL: https://issues.apache.org/jira/browse/FLINK-37316
 Project: Flink
  Issue Type: New Feature
Reporter: xuhuang


Currently, the `StateManager` only provides the `getState` method. Below is the 
method signature:

/**
 * Get the specific list state.
 *
 * @param stateDeclaration of this state.
 * @return the list state corresponds to the state declaration.
 */
 Optional> getState(ListStateDeclaration 
stateDeclaration) throws Exception;

In most scenarios, the `getState` method will return a non-empty instance. 
Therefore, we can consider providing another API that handles errors internally 
for users and returns the state directly, rather than wrapping it in an 
Optional.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[VOTE] FLIP-508: Add support for Smile format for Compiled plans

2025-02-13 Thread Sergey Nuyanzin
Hi everyone,

I'd like to start a vote on FLIP-508: Add support for Smile format for
Compiled plans [1]
which has been discussed in this thread [2].

The vote will be open for at least 72 hours unless there is an objection
or not enough votes.

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-508%3A+Add+support+for+Smile+format+for+Compiled+plans
[2] https://lists.apache.org/thread/jtcw002hnty29dnq9gykxkzy8gdkmb8j

-- 
Best regards,
Sergey


Re: [VOTE] FLIP-508: Add support for Smile format for Compiled plans

2025-02-13 Thread Samrat Deb
+1 (non binding)

Bests,
Samrat
On Thu, 13 Feb 2025 at 5:01 PM, Fabian Hüske 
wrote:

> +1 (binding)
>
> Cheers, Fabian
>
> On Thu, Feb 13, 2025 at 12:07 PM David Radley 
> wrote:
>
> > +1 (non-binding)
> >
> > Kind regards, David.
> >
> > From: Sergey Nuyanzin 
> > Date: Thursday, 13 February 2025 at 10:52
> > To: dev@flink.apache.org 
> > Subject: [EXTERNAL] [VOTE] FLIP-508: Add support for Smile format for
> > Compiled plans
> > Hi everyone,
> >
> > I'd like to start a vote on FLIP-508: Add support for Smile format for
> > Compiled plans [1]
> > which has been discussed in this thread [2].
> >
> > The vote will be open for at least 72 hours unless there is an objection
> > or not enough votes.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-508%3A+Add+support+for+Smile+format+for+Compiled+plans
> > [2] https://lists.apache.org/thread/jtcw002hnty29dnq9gykxkzy8gdkmb8j
> >
> > --
> > Best regards,
> > Sergey
> >
> > Unless otherwise stated above:
> >
> > IBM United Kingdom Limited
> > Registered in England and Wales with number 741598
> > Registered office: Building C, IBM Hursley Office, Hursley Park Road,
> > Winchester, Hampshire SO21 2JN
> >
>


[jira] [Created] (FLINK-37319) Add retry in RocksDBStateUploader for fault tolerant

2025-02-13 Thread Zhenqiu Huang (Jira)
Zhenqiu Huang created FLINK-37319:
-

 Summary: Add retry in RocksDBStateUploader for fault tolerant
 Key: FLINK-37319
 URL: https://issues.apache.org/jira/browse/FLINK-37319
 Project: Flink
  Issue Type: Improvement
Reporter: Zhenqiu Huang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Pluggable Batching for Async Sink in Flink

2025-02-13 Thread Ahmed Hamdy
FYI: Created FLIP-509.

Best Regards
Ahmed Hamdy


On Thu, 13 Feb 2025 at 16:30, Ahmed Hamdy  wrote:

> Hi Poorvank,
> thanks for the feedback, I can see your point and I kinda agree, I would
> say, if you don't need the flushing trigger interface in your use case
> let's keep it out of the FLIP for now, Also let's wait for the feedback
> about that from other members.
> I will create a FLIP and assign it a number.
>
> Best Regards
> Ahmed Hamdy
>
>
> On Thu, 13 Feb 2025 at 09:15, Poorvank Bhatia 
> wrote:
>
>> Hello Ahmed,
>> Thank you for your insights! In fact, FLIP-284
>> <
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-284+%3A+Making+AsyncSinkWriter+Flush+triggers+adjustable
>> >'s
>> proposal aligns so well with this approach, I have added it has the future
>> scope.
>> To answer your questions:
>>
>> I assume we will not expose batch creators to users but only to sink
>> implementers, am I correct?
>>
>>- *Yes, that is correct. The BatchCreator interface is intended to be
>>used only by sink implementers, not end users. Since the SinkWriter is
>>responsible for defining how records are batched and written, only
>> those
>>developing a custom sink would need access to this functionality.*
>>
>> It is nit: but I would prefer adding the creator to the
>> `AsyncSinkWriterConfiguration` rather than overloading the constructor,
>> there are some already implemented ways for instantiating the configs with
>> defaults so would be simpler.
>>
>>
>>- *Yes, BatchCreator can be included in AsyncSinkWriterConfiguration.
>>However, my initial reasoning for keeping it separate was that
>> BatchCreator
>>is not just a configuration parameter—it's a functional component that
>>directly influences how records are batched. This makes it more of a
>>behavioral dependency rather than a passive setting. That said, if
>>incorporating it into AsyncSinkWriterConfiguration aligns better with
>> the
>>overall design, it is a nit change and I'm open to updating the
>> approach
>>accordingly. Do let me know your thoughts, as RateLimitingStartegy is
>> also
>>a part of it.*
>>
>>
>>  I want to leverage some concept from FLIP-284 that we have previously
>> seen in DynamoDbAsyncSink, do you think we can also customize the trigger
>> so we can also add a "readyToFlush()" method or something similar and use
>> it in `nonBlockingFlush` methods instead of forcing the trigger on only
>> total record count or buffer size. I see this useful in your case as well
>> because if we have buffered 100 requests from 100 different partitions
>> then
>> we will keep triggering and emitting batches of 1 for 100 times (unless
>> maxTimeInBufferMS is used ofc) so you can use the custom trigger for
>> minimum buffered records per partition. This might be a bit tricky because
>> in order to not affect the performance we probably will wrap the whole
>> buffer in the batchCreator to maintain calculations like size in bytes or
>> per partition record count in the batch creator making it more
>> "bufferHandler" rather than a "batchCreator", let me know your thoughts
>> about this
>>
>>
>>-
>>
>>*You bring up a valid concern about optimizing flush triggers to
>> prevent
>>inefficient batch emissions when multiple partitions exist,
>> particularly in
>>sinks like DynamoDB and Cassandra. IMO flushing (when to send records)
>> and
>>batching (how to group records) should remain separate concerns, as
>> they
>>serve distinct purposes.*
>>
>>*To structure this properly, I’ll use FLIP-284 as the foundation:*
>>*🔹 BufferFlushTrigger (From FLIP-284) - Decides When to Flush*
>>
>>*The BufferFlushTrigger should be responsible for determining when the
>>buffer is ready to flush, based on configurable conditions such as:*
>>- *Total record count (batch size threshold).*
>>   - *Buffer size in bytes.*
>>   - *Time-based constraints (maxTimeInBufferMS), or any other custom
>>   logic a sink may require.*
>>
>>*🔹 BatchCreator - Decides How to Form a Batch*
>>
>>*Once BufferFlushTrigger determines that a flush should occur,
>>BatchCreator is responsible for reading the buffered requests and
>> forming a
>>batch based on partitioning rules.*
>>- *BatchCreator does not decide when to flush—it only handles how
>>   records are grouped when a flush is triggered.*
>>   - *This separation allows partition-aware batching, ensuring that
>>   records from the same partition are grouped together before
>> submission.*
>>--
>>*🔹 Example: BatchCreator with Two Partitioning Keys**Scenario*
>>   - *batchSize = 20*
>>   - *The buffer contains records from two partitions.*
>>   - *Assume either (a) we flush when at least 10 records exist per
>>   partition.*
>>   - *We apply a simple flush strategy that triggers a flush when 20
>>   total records are buffered.*
>>*How BatchCre

Re: [DISCUSS] FLIP-491: BundledAggregateFunction for batched aggregation

2025-02-13 Thread Timo Walther
Thank you for the update Alan. I just have some last minor comments, 
mostly around naming:


1. Consistency:

Could we rename the `canXX` methods to `supportsXX` methods? 
This might fit better to the existing methods such as 
`FunctionDefinition.supportsConstantFoloding()` and other ability 
interfaces that we use in connectors.


Could we rename the table option `table.exec.agg-bundled` to 
`table.exec.bundled-agg`?  This fits better to existing options that 
also put the "agg" at the end e.g. `hash-agg` or `window-agg`.


2. Making the interface explicit

It shouldn't be a problem to explicitly add the 
`bundledAccumulateRetract()` method to the `BundledAggregateFunction`. 
The signature is not dynamic like accumulare()/retract(). We can add a 
default implementation such as:


default List 
bundledAccumulateRetract(List batch)

throws Exception {
  throw new UnsupportOperationException("Method must be implemented 
when supportsBundling() is true.");

}


Otherwise the FLIP seems to be in a good shape.

+1 for voting.

Regards,
Timo


On 07.02.25 22:02, Alan Sheinberg wrote:

Hi all,

Is there any more feedback I can incorporate before calling a vote?

Thanks,
Alan

On Tue, Jan 28, 2025 at 1:50 PM Alan Sheinberg 
wrote:


Hi Fabian,

I addressed your comments below.



* BundledKeySegement
   * should it be accumulatorS instead of accumulator?
   * should accumulators be null or an empty list if no accumulator is
present?



Good catch, forgot to update those places.  It should be "accumulators"
and an empty list when there are none. Updated in the FLIP.



* update the Group By Example to use a list of accumulator instead of a
single value



Updated.

* fix the `AvgAggregate` example:

   * add missing `canRetract()`, and `canMerge()` methods
   * example uses `batch()` method instead of `bundledAccumulateRetract()`
   * example works with a single accumulator, not a list of accumulators



Updated AvgAggregate to address these things.  They should be consistent
with the interface discussed.

* in general, check the code examples for compliance with the proposed API.

   * Some use `bundle()` instead of `bundledAccumulateRetract()`.
   * There might be other mistakes that sneaked when evolving the API



Yeah, you're right.  I'll read through it again and try to find any
missing updates.

Thanks,
Alan

On Tue, Jan 28, 2025 at 3:58 AM Fabian Hüske 
wrote:


Thanks Alan for updating the FLIP!

IMO, it looks good.

Just a few nits that would be great to fix for consistency:

* BundledKeySegement
   * should it be accumulatorS instead of accumulator?
   * should accumulators be null or an empty list if no accumulator is
present?
* update the Group By Example to use a list of accumulator instead of a
single value
* fix the `AvgAggregate` example:
   * add missing `canRetract()`, and `canMerge()` methods
   * example uses `batch()` method instead of `bundledAccumulateRetract()`
   * example works with a single accumulator, not a list of accumulators
* in general, check the code examples for compliance with the proposed
API.
   * Some use `bundle()` instead of `bundledAccumulateRetract()`.
   * There might be other mistakes that sneaked when evolving the API

Thank you,
Fabian


On Tue, Jan 28, 2025 at 12:54 AM Alan Sheinberg
 wrote:


Hi everyone,

Sorry for the delayed response!  I appreciate the comments.

Addressing Timo's comments:


Correct me if I'm wrong, but for AggregateFunctions implementing a
retract() and merge() is optional. How can a BundledAggregateFunction
communicate whether or not this is supported to the planner? Enforcing
the retract() feature in the interface specification could be an

option,

but esp for window aggregations there might not be a retract required.



This is a good catch.  Much of my experimenting with a POC was done

with a

retract call and it slipped my mind that it's optional.  I think this

will

have to be added to the interface  BundledAggregateFunction.

Also how do you plan to support merge() in this design? I couldn't find

any mentioning in the FLIP.


Searching through operators which used merge, I wasn't clear that I

would

require it in the implementation, so I didn't think it required

support.  I

now see it's used in windows and maybe elsewhere.  I'll add a list of
accumulators rather than a single one -- the first step will be to merge
accumulators before applying any of the accumulate or retract calls. I

need

to look more closely at the operators that will use them, but think it

may

make sense to do in this way.  Tell me if you feel strongly that they
should be separate method calls.

Addressing Fabian's comments:


* Why do we need the `canBundle()` function? We can use the interface
itself as a marker. A function that can't bundle, shouldn't implement

the

interface.
   * the interface could just contain the
`bundledAccumulateRetract(List bundle)` method?



I originally had it just like you're recommending, but it removes

Re: [DISCUSS] Pluggable Batching for Async Sink in Flink

2025-02-13 Thread Ahmed Hamdy
Hi Poorvank,
thanks for the feedback, I can see your point and I kinda agree, I would
say, if you don't need the flushing trigger interface in your use case
let's keep it out of the FLIP for now, Also let's wait for the feedback
about that from other members.
I will create a FLIP and assign it a number.

Best Regards
Ahmed Hamdy


On Thu, 13 Feb 2025 at 09:15, Poorvank Bhatia 
wrote:

> Hello Ahmed,
> Thank you for your insights! In fact, FLIP-284
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-284+%3A+Making+AsyncSinkWriter+Flush+triggers+adjustable
> >'s
> proposal aligns so well with this approach, I have added it has the future
> scope.
> To answer your questions:
>
> I assume we will not expose batch creators to users but only to sink
> implementers, am I correct?
>
>- *Yes, that is correct. The BatchCreator interface is intended to be
>used only by sink implementers, not end users. Since the SinkWriter is
>responsible for defining how records are batched and written, only those
>developing a custom sink would need access to this functionality.*
>
> It is nit: but I would prefer adding the creator to the
> `AsyncSinkWriterConfiguration` rather than overloading the constructor,
> there are some already implemented ways for instantiating the configs with
> defaults so would be simpler.
>
>
>- *Yes, BatchCreator can be included in AsyncSinkWriterConfiguration.
>However, my initial reasoning for keeping it separate was that
> BatchCreator
>is not just a configuration parameter—it's a functional component that
>directly influences how records are batched. This makes it more of a
>behavioral dependency rather than a passive setting. That said, if
>incorporating it into AsyncSinkWriterConfiguration aligns better with
> the
>overall design, it is a nit change and I'm open to updating the approach
>accordingly. Do let me know your thoughts, as RateLimitingStartegy is
> also
>a part of it.*
>
>
>  I want to leverage some concept from FLIP-284 that we have previously
> seen in DynamoDbAsyncSink, do you think we can also customize the trigger
> so we can also add a "readyToFlush()" method or something similar and use
> it in `nonBlockingFlush` methods instead of forcing the trigger on only
> total record count or buffer size. I see this useful in your case as well
> because if we have buffered 100 requests from 100 different partitions then
> we will keep triggering and emitting batches of 1 for 100 times (unless
> maxTimeInBufferMS is used ofc) so you can use the custom trigger for
> minimum buffered records per partition. This might be a bit tricky because
> in order to not affect the performance we probably will wrap the whole
> buffer in the batchCreator to maintain calculations like size in bytes or
> per partition record count in the batch creator making it more
> "bufferHandler" rather than a "batchCreator", let me know your thoughts
> about this
>
>
>-
>
>*You bring up a valid concern about optimizing flush triggers to prevent
>inefficient batch emissions when multiple partitions exist,
> particularly in
>sinks like DynamoDB and Cassandra. IMO flushing (when to send records)
> and
>batching (how to group records) should remain separate concerns, as they
>serve distinct purposes.*
>
>*To structure this properly, I’ll use FLIP-284 as the foundation:*
>*🔹 BufferFlushTrigger (From FLIP-284) - Decides When to Flush*
>
>*The BufferFlushTrigger should be responsible for determining when the
>buffer is ready to flush, based on configurable conditions such as:*
>- *Total record count (batch size threshold).*
>   - *Buffer size in bytes.*
>   - *Time-based constraints (maxTimeInBufferMS), or any other custom
>   logic a sink may require.*
>
>*🔹 BatchCreator - Decides How to Form a Batch*
>
>*Once BufferFlushTrigger determines that a flush should occur,
>BatchCreator is responsible for reading the buffered requests and
> forming a
>batch based on partitioning rules.*
>- *BatchCreator does not decide when to flush—it only handles how
>   records are grouped when a flush is triggered.*
>   - *This separation allows partition-aware batching, ensuring that
>   records from the same partition are grouped together before
> submission.*
>--
>*🔹 Example: BatchCreator with Two Partitioning Keys**Scenario*
>   - *batchSize = 20*
>   - *The buffer contains records from two partitions.*
>   - *Assume either (a) we flush when at least 10 records exist per
>   partition.*
>   - *We apply a simple flush strategy that triggers a flush when 20
>   total records are buffered.*
>*How BatchCreator Forms the Batch*
>
>*Even though BufferFlushTrigger initiates the flush when 20 records are
>reached, BatchCreator must still decide how to structure the batch. It
> does
>so by:*
>1. *Reading the buffered records

[jira] [Created] (FLINK-37320) FINISHED jobs incorrectly being set to RECONCILING

2025-02-13 Thread Luca Castelli (Jira)
Luca Castelli created FLINK-37320:
-

 Summary: FINISHED jobs incorrectly being set to RECONCILING
 Key: FLINK-37320
 URL: https://issues.apache.org/jira/browse/FLINK-37320
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.10.0
 Environment: I've attached the flinkdeployment CR and operator-config 
I used to locally replicate.
Reporter: Luca Castelli
 Attachments: 
flink-kubernetes-operator-deploy-flink-kubernetes-operator-6f97d96777-8k2d4-1739457686217038000.log,
 operator-config.yaml, test-batch-job.yaml

Hello,

I believe we've found a bug within the observation logic for finite streaming 
or batch jobs. This is a follow-up to [this dev mailing list 
post](https://lists.apache.org/thread/xvsk4fmlqln092cdolvox4dgko0pw81k)
 # The job finishes successfully and the job status changes to FINISHED
 # TTL (kubernetes.operator.jm-deployment.shutdown-ttl) cleanup removes the JM 
deployments and clears HA configmap data
 # On the next loop, the observer sees MISSING JM and changes the job status 
from FINISHED to RECONCILING

The job had reached a terminal state. It shouldn't have been set back to 
RECONCILING.

This leads to an operator error later when a recovery attempt is triggered. The 
recovery is triggered because the JM is MISSING, the status is RECONCILING, 
spec shows RUNNING, and HA enabled. The recovery fails because 
validateHaMetadataExists throws UpgradeFailureException.

At that point the deployment gets stuck in a RECONCILING loop with 
UpgradeFailureException thrown on each cycle. I've attached operator logs 
showing this.

 

I think the fix would be to wrap 
[this](https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java#L155)
 in an if-statement that checks the job is not in a terminal state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-37321) Make window functions with wrong input fail during planning rather than runtime

2025-02-13 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-37321:
---

 Summary: Make window functions with wrong input fail during 
planning rather than runtime
 Key: FLINK-37321
 URL: https://issues.apache.org/jira/browse/FLINK-37321
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Sergey Nuyanzin
Assignee: Sergey Nuyanzin






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-506: Support Reuse Multiple Table Sinks in Planner

2025-02-13 Thread Ron Liu
Hi, Xiangyu

>>> I prefer to set the default value of this option as'false in the first
place.  Setting as true might introduce unexpected behavior for users when
operating existing jobs. Maybe we should introduce this feature at first
and discuss enabling this feature as default in a separated thread. WDYT?

1. What unexpected behaviors do you think this might introduce?  For Sink
nodes, which are generally stateless, I intuitively understand that no
state compatibility issues will be introduced after Sink reuse.

2. Since Sink reuse benefits users, why not enable this feature by default
on the first day it is introduced? If your concern is potential unhandled
corner cases in the implementation, I consider those to be bugs. We should
prioritize fixing them rather than blocking the default enablement of this
optimization.

3. If we don't enable it by default now, when should we? What specific
milestones or actions are needed during the waiting period?  Your concerns
about unintended behaviors would still exist even if we enable it later.
Why delay resolving this in a separate discussion instead of finalizing it
here?

4. From our internal practice, users still want to enjoy the benefits of
this feature by default.


Best,
Ron

xiangyu feng  于2025年2月13日周四 15:57写道:

>  Hi Ron,
>
> Thx for quick response.
>
> - should the default value be true for the newly introduced option
> `table.optimizer.reuse-sink-enabled`?
>
> I prefer to set the default value of this option as'false in the first
> place.  Setting as true might introduce unexpected behavior for users when
> operating existing jobs. Maybe we should introduce this feature at first
> and discuss enabling this feature as default in a separated thread. WDYT?
>
> - have you considered the technical implementation options and are they
> feasible?
>
> Yes, we have already implemented the POC internally. It works well.
>
> Looking forward for your feedback.
>
> Best,
> Xiangyu
>
> Ron Liu  于2025年2月13日周四 14:55写道:
>
> > Hi, Xiangyu
> >
> > Thank you for proposing this FLIP, it's great work and looks very useful
> > for users.
> >
> > I have the following two questions regarding the content of the FLIP:
> > 1. Since sink reuse is very useful, should the default value be true for
> > the newly introduced option `table.optimizer.reuse-sink-enabled`, and
> > should the engine enable this optimization by default. Currently for
> source
> > reuse, the default value of  `sql.optimizer.reuse.table-source.enabled`
> > option is also true, which does not require user access by default, so I
> > think the engine should turn on Sink reuse optimization by default.
> > 2. Regarding Sink Digest, you mentioned disregarding the sink target
> > column, which I think is a very good suggestion, and very useful if it
> can
> > be done. I have a question: have you considered the technical
> > implementation options and are they feasible?
> >
> > Best,
> > Ron
> >
> > xiangyu feng  于2025年2月13日周四 12:56写道:
> >
> > > Hi all,
> > >
> > > Thank you all for the comments.
> > >
> > > If there is no further comment, I will open the voting thread in 3
> days.
> > >
> > > Regards,
> > > Xiangyu
> > >
> > > xiangyu feng  于2025年2月11日周二 14:17写道:
> > >
> > > > Link for Paimon LocalMerge Operator[1]
> > > >
> > > > [1]
> > > >
> > >
> >
> https://paimon.apache.org/docs/master/maintenance/write-performance/#local-merging
> > > >
> > > > xiangyu feng  于2025年2月11日周二 14:03写道:
> > > >
> > > >> Follow the above,
> > > >>
> > > >> "And for SinkWriter, the data structure to be processed should be
> > > fixed."
> > > >>
> > > >> I'm not very sure why the data structure of SinkWriter should be
> > fixed.
> > > >> Can you elaborate the scenario here?
> > > >>
> > > >>  "Is there a node or an operator to fill in the inconsistent field
> of
> > > >> Rowdata that passed from different Sources?"
> > > >>
> > > >> By `filling in the inconsistent field from different sources`, do
> you
> > > >> refer to implementations like the LocalMerge Operator [1] for
> Paimon?
> > > IMHO,
> > > >> this should not be included in the Sink Reuse. The merging behavior
> of
> > > >> multiple sources should be considered inside of the sink.
> > > >>
> > > >> Regards,
> > > >> Xiangyu Feng
> > > >>
> > > >> xiangyu feng  于2025年2月11日周二 13:46写道:
> > > >>
> > > >>> Hi Yanquan,
> > > >>>
> > > >>> Thx for reply. IIUC, the schema of CatalogTable should contain all
> > > >>> target columns for sources. If not, a SQL validation exception
> should
> > > be
> > > >>> raised for planner.
> > > >>>
> > > >>> Regards,
> > > >>> Xiangyu Feng
> > > >>>
> > > >>>
> > > >>>
> > > >>> Yanquan Lv  于2025年2月10日周一 16:25写道:
> > > >>>
> > >  Hi, Xiangyu. Thanks for driving this.
> > > 
> > >  I have a question to confirm:
> > >  Considering the case that different Sources use different
> > columns[1],
> > >  will the Schema of CatalogTable[2] contain all target columns for
> > > Sources?
> > >  And for SinkWriter, the data structure 

[jira] [Created] (FLINK-37323) Checkpoint Growth & Akka FrameSize Exception when Migrating from FlinkKafkaProducer to KafkaSink

2025-02-13 Thread Zimehr Abbasi (Jira)
Zimehr Abbasi created FLINK-37323:
-

 Summary: Checkpoint Growth & Akka FrameSize Exception when 
Migrating from FlinkKafkaProducer to KafkaSink
 Key: FLINK-37323
 URL: https://issues.apache.org/jira/browse/FLINK-37323
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.14.3
Reporter: Zimehr Abbasi


While migrating from {{FlinkKafkaProducer}} to {{{}KafkaSink{}}}, we used the 
same {{uid}} for both sinks to maintain continuity within our large distributed 
system. Initially, this was not an issue, but after multiple job restarts, we 
encountered an {*}Akka FrameSize exception{*}:

```

The rpc invocation size 16,234,343 exceeds the maximum akka framesize.

```

This occurred in a simple stream setup with a source and sink function, with no 
changing state. However, despite no explicit interaction with state, 
*checkpoint sizes kept increasing* with each restart. Upon deserializing the 
state, I found that the partition offsets in {{{}FlinkKafkaProducer{}}}’s 
{{next-transactional-id-hint-v2}} were growing continuously.

The root cause appears to be that {{next-transactional-id-hint-v2}} is stored 
as a {*}UnionListState{*}, meaning that upon each restart, the number of 
partition offsets in the state is {*}multiplied by the parallelism{*}, as all 
state is assigned to all operator subtasks.

This issue does not occur with {{FlinkKafkaProducer}} because it explicitly 
calls {{{}clear(){}}}, whereas {{KafkaSink}} does not interact with this state.
h4. Workarounds Considered
 * Setting a different {{uid}} for the two sinks avoids the issue but requires 
{{{}--allow-non-restored-state{}}}, which is not viable as we {*}cannot afford 
any data loss{*}.
 * Restarting the job from scratch resolves the issue but is {*}not ideal{*}.
 * Adding a *custom operator before {{KafkaSink}}* with the {{uid}} of 
{{FlinkKafkaProducer}} that acts as a *NOOP* and clears the old state before 
forwarding records.

h4. Question

What would be the recommended approach to safely transition from 
{{FlinkKafkaProducer}} to {{KafkaSink}} without accumulating unnecessary state 
and without requiring {{{}--allow-non-restored-state{}}}? Would introducing a 
NOOP operator to clear the legacy state be a valid approach?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-506: Support Reuse Multiple Table Sinks in Planner

2025-02-13 Thread xiangyu feng
Hi Ron,

After second thought, taking sink reuse as a long awaited feature
with significant benefits for users, I agree to enable this in the first
place.  Similar features like `table.optimizer.reuse-sub-plan-enabled` and
`table.optimizer.reuse-source-enabled` are also enabled by default. From
this point of view, sink reuse should be the same.

The Flip[1] has been revised accordingly. Thx for suggestion.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner

Regards,
Xiangyu




Ron Liu  于2025年2月14日周五 12:10写道:

> Hi, Xiangyu
>
> >>> I prefer to set the default value of this option as'false in the first
> place.  Setting as true might introduce unexpected behavior for users when
> operating existing jobs. Maybe we should introduce this feature at first
> and discuss enabling this feature as default in a separated thread. WDYT?
>
> 1. What unexpected behaviors do you think this might introduce?  For Sink
> nodes, which are generally stateless, I intuitively understand that no
> state compatibility issues will be introduced after Sink reuse.
>
> 2. Since Sink reuse benefits users, why not enable this feature by default
> on the first day it is introduced? If your concern is potential unhandled
> corner cases in the implementation, I consider those to be bugs. We should
> prioritize fixing them rather than blocking the default enablement of this
> optimization.
>
> 3. If we don't enable it by default now, when should we? What specific
> milestones or actions are needed during the waiting period?  Your concerns
> about unintended behaviors would still exist even if we enable it later.
> Why delay resolving this in a separate discussion instead of finalizing it
> here?
>
> 4. From our internal practice, users still want to enjoy the benefits of
> this feature by default.
>
>
> Best,
> Ron
>
> xiangyu feng  于2025年2月13日周四 15:57写道:
>
> >  Hi Ron,
> >
> > Thx for quick response.
> >
> > - should the default value be true for the newly introduced option
> > `table.optimizer.reuse-sink-enabled`?
> >
> > I prefer to set the default value of this option as'false in the first
> > place.  Setting as true might introduce unexpected behavior for users
> when
> > operating existing jobs. Maybe we should introduce this feature at first
> > and discuss enabling this feature as default in a separated thread. WDYT?
> >
> > - have you considered the technical implementation options and are they
> > feasible?
> >
> > Yes, we have already implemented the POC internally. It works well.
> >
> > Looking forward for your feedback.
> >
> > Best,
> > Xiangyu
> >
> > Ron Liu  于2025年2月13日周四 14:55写道:
> >
> > > Hi, Xiangyu
> > >
> > > Thank you for proposing this FLIP, it's great work and looks very
> useful
> > > for users.
> > >
> > > I have the following two questions regarding the content of the FLIP:
> > > 1. Since sink reuse is very useful, should the default value be true
> for
> > > the newly introduced option `table.optimizer.reuse-sink-enabled`, and
> > > should the engine enable this optimization by default. Currently for
> > source
> > > reuse, the default value of  `sql.optimizer.reuse.table-source.enabled`
> > > option is also true, which does not require user access by default, so
> I
> > > think the engine should turn on Sink reuse optimization by default.
> > > 2. Regarding Sink Digest, you mentioned disregarding the sink target
> > > column, which I think is a very good suggestion, and very useful if it
> > can
> > > be done. I have a question: have you considered the technical
> > > implementation options and are they feasible?
> > >
> > > Best,
> > > Ron
> > >
> > > xiangyu feng  于2025年2月13日周四 12:56写道:
> > >
> > > > Hi all,
> > > >
> > > > Thank you all for the comments.
> > > >
> > > > If there is no further comment, I will open the voting thread in 3
> > days.
> > > >
> > > > Regards,
> > > > Xiangyu
> > > >
> > > > xiangyu feng  于2025年2月11日周二 14:17写道:
> > > >
> > > > > Link for Paimon LocalMerge Operator[1]
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://paimon.apache.org/docs/master/maintenance/write-performance/#local-merging
> > > > >
> > > > > xiangyu feng  于2025年2月11日周二 14:03写道:
> > > > >
> > > > >> Follow the above,
> > > > >>
> > > > >> "And for SinkWriter, the data structure to be processed should be
> > > > fixed."
> > > > >>
> > > > >> I'm not very sure why the data structure of SinkWriter should be
> > > fixed.
> > > > >> Can you elaborate the scenario here?
> > > > >>
> > > > >>  "Is there a node or an operator to fill in the inconsistent field
> > of
> > > > >> Rowdata that passed from different Sources?"
> > > > >>
> > > > >> By `filling in the inconsistent field from different sources`, do
> > you
> > > > >> refer to implementations like the LocalMerge Operator [1] for
> > Paimon?
> > > > IMHO,
> > > > >> this should not be included in the Sink Reuse. The merging
> behavior
> > of
> > > > >> multiple sources should be considered i

Re: Migrating CI to Github Actions

2025-02-13 Thread Matthias Pohl
I wasn't able to continue with FLIP-396 [1] and didn't have the time to
finish off FLINK-33901 [2]. Whoever wants to help out with that effort can
pick up the subtasks in FLINK-33901 [2]. I tried to document all the things
in the individual tickets. I'm happy to answer questions around that topic.

There is a kind-of blocker with FLINK-34331 [3] if we want to switch to
Apache INFRA's ephemeral runners. But that could be handled independently,
I guess.

To answer your questions:
1) There have been a few discussions initiated offline to pick up the work.
But so far, this didn't result in the GHA work making progress.
2) As said above: You can go ahead and continue working on FLIP-396 [1].
The FLIP's ML discussion marks the current state of the community's view on
it as far as I know.
3) Functionality-wise that should be fine: GHA utilizes the same scripts
the Azure pipeline uses.

Best,
Matthias

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-396%3A+Trial+to+test+GitHub+Actions+as+an+alternative+for+Flink%27s+current+Azure+CI+infrastructure
[2] https://issues.apache.org/jira/browse/FLINK-33901
[3] https://issues.apache.org/jira/browse/FLINK-34331

On Thu, Feb 13, 2025 at 2:21 PM Tom Cooper  wrote:

> Hi all,
>
> I was hoping to sync up on the progress on moving from Azure CI to GitHub
> Actions for the main Flink repository. There was FLIP-396 [1] by Matthias
> Pohl detailing the plan to trial GitHub Actions and FLINK-27075 [2] which
> tracks the work. There are several workflows in the repo already and
> actions do seem to be running [3]. The FLIP mentions the 1.19 release as
> the watershed for deciding if we move off Azure CI. As we are now nearly at
> 2.0, I wondered if there has been anymore discussion on this?
>
> My main motivation for asking is that, as part of Community Health
> Initiative (CHI) Workgroup [4], we have been looking at how to further
> speed up/simplify PR reviews. One of the main issues with reviews is that
> the PR has failed to pass the CI tests and in many cases that will be
> something simple like a failure of the checkstyle/spotless checks. However,
> to find that out you need to click through several layers of Azure CI UI
> and parse the test logs.
>
> It would be useful if we could run these standard linting checks for every
> PR before the main CI is run and make that clearly visible to the submitter
> on the PR via the GH CI UI integration (green ticks or red crosses with a
> clear reason). GitHub Actions seems like a perfect fit for this and indeed
> we already have a workflow for pre-compile checks [5] that would perform
> this. However, that workflow does not run on pull requests.
>
> So I was wondering:
>
> 1) Has there been a discussion on moving forward with the move to GH
> Actions?
> 2) If the process has stalled due to a lack of developer time, then the
> CHI members are willing to help but we may need context/help from those
> previously involved.
> 3) As a minimum, would we be able to enable the pre-compile checks for all
> PRs?
>
> Thanks,
>
> Tom Cooper
> @tomcooper.dev | https://tomcooper.dev
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-396%3A+Trial+to+test+GitHub+Actions+as+an+alternative+for+Flink%27s+current+Azure+CI+infrastructure
> [2] https://issues.apache.org/jira/browse/FLINK-27075
> [3] https://github.com/apache/flink/actions
> [4]
> https://cwiki.apache.org/confluence/display/FLINK/Community+Health+Initiative+%28CHI%29+workgroup
> [5]
> https://github.com/apache/flink/blob/master/.github/workflows/template.pre-compile-checks.yml
>