[FLIP] Forward a flip to introduce minibatch optimization for Join

2024-01-10 Thread shuai xu
Hi all,

Currently, when performing cascading connections in Flink, there is a pain 
point of record amplification as mentioned in discussion similar to 
https://lists.apache.org/thread/2021fmwhtotl0okmtyc5b7tndlp3khf9,
I have implemented the optimization of POC and it works. I wonder to forward a 
FLIP to explain my plan. But there is no permission to create files in the 
Flink Improvement Proposals [1] space. I may need PMC to help me add 
permissions: My Jira account is xu_shuai_ The email is xushuai...@gmail.com 
.Thanks!

[DISCUSS] FLIP-415: Introduce a new join operator to support minibatch

2024-01-10 Thread shuai xu
Hi devs,

I’d like to start a discussion on FLIP-415: Introduce a new join operator to 
support minibatch[1].

Currently, when performing cascading connections in Flink, there is a pain 
point of record amplification. Every record join operator receives would 
trigger join process. However, if records of +I and -D matches , they could be 
folded to reduce two times of join process. Besides, records of  -U +U might 
output 4 records in which two records are redundant when encountering outer 
join . 

To address this issue, this FLIP introduces a new  
MiniBatchStreamingJoinOperator to achieve batch processing which could reduce 
number of outputting redundant messages and avoid unnecessary join processes. 
A new option is added to control the operator to avoid influencing existing 
jobs.

Please find more details in the FLIP wiki document [1]. Looking
forward to your feedback.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-415%3A+Introduce+a+new+join+operator+to+support+minibatch

Best,
Xu Shuai


Re: [DISCUSS] FLIP-415: Introduce a new join operator to support minibatch

2024-01-10 Thread shuai xu
Thanks for your response, Benchao.

Here is my thought on the newly added option.
Users' current jobs are running on a version without minibatch join. If the 
existing option to enable minibatch join is utilized, then when users' jobs are 
migrated to the new version, the internal behavior of the join operation within 
the jobs will change. Although the semantic of changelog emitted by the Join 
operator is eventual consistency, the change might not be supposed for the 
downstream of the job which requires details of changelog. This newly added 
option also refers to 
'table.exec.deduplicate.mini-batch.compact-changes-enabled'. 

As for the implementation,The new operator shares the state of the original 
operator and it merely has an additional minibatch for storing records to do 
some optimization. The storage remains consistent, and there is minor 
modification to the computational logic.

Best,
Xu Shuai

> 2024年1月10日 22:56,Benchao Li  写道:
> 
> Thanks shuai for driving this, mini-batch Join is a very useful
> optimization, +1 for the general idea.
> 
> Regarding the configuration
> "table.exec.stream.join.mini-batch-enabled", I'm not sure it's really
> necessary. The semantic of changelog emitted by the Join operator is
> eventual consistency, so there is no much difference between original
> Join and mini-batch Join from this aspect. Besides, introducing more
> options would make it more complex for users, harder to understand and
> maintain, which we should be careful about.
> 
> One thing about the implementation, could you make the new operator
> share the same state definition with the original one?
> 
> shuai xu  于2024年1月10日周三 21:23写道:
>> 
>> Hi devs,
>> 
>> I’d like to start a discussion on FLIP-415: Introduce a new join operator to 
>> support minibatch[1].
>> 
>> Currently, when performing cascading connections in Flink, there is a pain 
>> point of record amplification. Every record join operator receives would 
>> trigger join process. However, if records of +I and -D matches , they could 
>> be folded to reduce two times of join process. Besides, records of  -U +U 
>> might output 4 records in which two records are redundant when encountering 
>> outer join .
>> 
>> To address this issue, this FLIP introduces a new  
>> MiniBatchStreamingJoinOperator to achieve batch processing which could 
>> reduce number of outputting redundant messages and avoid unnecessary join 
>> processes.
>> A new option is added to control the operator to avoid influencing existing 
>> jobs.
>> 
>> Please find more details in the FLIP wiki document [1]. Looking
>> forward to your feedback.
>> 
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-415%3A+Introduce+a+new+join+operator+to+support+minibatch
>> 
>> Best,
>> Xu Shuai
> 
> 
> 
> -- 
> 
> Best,
> Benchao Li



Re: [DISCUSS] FLIP-415: Introduce a new join operator to support minibatch

2024-01-10 Thread shuai xu
Hi Jane,

Thanks for your reminder!  I missed this.

I updated the FLIP with the UML of MiniBatchStreamingJoinOperator and linking 
my POC implementation as reference. 
They are placed in the part of Proposed Changes. 

Best,
Xu Shuai



> 2024年1月11日 11:18,Jane Chan  写道:
> 
> Hi shuai,
> 
> Thanks for initiating the discussion. The mini-batch join optimization is
> very helpful, particularly for optimizing outer join conditions in CDC
> sources and handling cascade joins. And +1 for the proposal.
> 
> However, I don't see any details on the proposed
> "MiniBatchStreamingJoinOperator",  would you mind elaborating more about it?
> 
> Best,
> Jane
> 
> 
> On Wed, Jan 10, 2024 at 10:56 PM Benchao Li  wrote:
> 
>> Thanks shuai for driving this, mini-batch Join is a very useful
>> optimization, +1 for the general idea.
>> 
>> Regarding the configuration
>> "table.exec.stream.join.mini-batch-enabled", I'm not sure it's really
>> necessary. The semantic of changelog emitted by the Join operator is
>> eventual consistency, so there is no much difference between original
>> Join and mini-batch Join from this aspect. Besides, introducing more
>> options would make it more complex for users, harder to understand and
>> maintain, which we should be careful about.
>> 
>> One thing about the implementation, could you make the new operator
>> share the same state definition with the original one?
>> 
>> shuai xu  于2024年1月10日周三 21:23写道:
>>> 
>>> Hi devs,
>>> 
>>> I’d like to start a discussion on FLIP-415: Introduce a new join
>> operator to support minibatch[1].
>>> 
>>> Currently, when performing cascading connections in Flink, there is a
>> pain point of record amplification. Every record join operator receives
>> would trigger join process. However, if records of +I and -D matches , they
>> could be folded to reduce two times of join process. Besides, records of
>> -U +U might output 4 records in which two records are redundant when
>> encountering outer join .
>>> 
>>> To address this issue, this FLIP introduces a new
>> MiniBatchStreamingJoinOperator to achieve batch processing which could
>> reduce number of outputting redundant messages and avoid unnecessary join
>> processes.
>>> A new option is added to control the operator to avoid influencing
>> existing jobs.
>>> 
>>> Please find more details in the FLIP wiki document [1]. Looking
>>> forward to your feedback.
>>> 
>>> [1]
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-415%3A+Introduce+a+new+join+operator+to+support+minibatch
>>> 
>>> Best,
>>> Xu Shuai
>> 
>> 
>> 
>> --
>> 
>> Best,
>> Benchao Li
>> 



Re: [FLIP] Forward a flip to introduce minibatch optimization for Join

2024-01-10 Thread shuai xu
Hi Leonard,

Thanks for your help very much !

I have already started the discussion about FLIP-415: Introduce a new join 
operator to support minibatch[1]. 
And I’m looking forward to your feedback if you have some spare time to take a 
look.


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-415%3A+Introduce+a+new+join+operator+to+support+minibatch

Best,
Xu Shuai

> 2024年1月10日 17:47,Leonard Xu  写道:
> 
> Hey, shuai 
> 
> I’ve added wiki permission for you, looking forward your streaming join 
> optimization.
> 
> Best,
> Leonard




Re: [DISCUSS] FLIP-415: Introduce a new join operator to support minibatch

2024-01-11 Thread shuai xu
Suppose we currently have a job that joins two CDC sources after de-duplicating 
them and the output is available for audit analysis, and the user turns off the 
parameter "table.exec.deduplicate.mini-batch.compact-changes-enabled" to ensure 
that it does not lose update details. If we don't introduce this parameter, 
after the user upgrades the version, some update details may be lost due to the 
mini-batch connection being enabled by default, resulting in distorted audit 
results.

> 2024年1月11日 16:19,Benchao Li  写道:
> 
>> the change might not be supposed for the downstream of the job which 
>> requires details of changelog
> 
> Could you elaborate on this a bit? I've never met such kinds of
> requirements before, I'm curious what is the scenario that requires
> this.
> 
> shuai xu  于2024年1月11日周四 13:08写道:
>> 
>> Thanks for your response, Benchao.
>> 
>> Here is my thought on the newly added option.
>> Users' current jobs are running on a version without minibatch join. If the 
>> existing option to enable minibatch join is utilized, then when users' jobs 
>> are migrated to the new version, the internal behavior of the join operation 
>> within the jobs will change. Although the semantic of changelog emitted by 
>> the Join operator is eventual consistency, the change might not be supposed 
>> for the downstream of the job which requires details of changelog. This 
>> newly added option also refers to 
>> 'table.exec.deduplicate.mini-batch.compact-changes-enabled'.
>> 
>> As for the implementation,The new operator shares the state of the original 
>> operator and it merely has an additional minibatch for storing records to do 
>> some optimization. The storage remains consistent, and there is minor 
>> modification to the computational logic.
>> 
>> Best,
>> Xu Shuai
>> 
>>> 2024年1月10日 22:56,Benchao Li  写道:
>>> 
>>> Thanks shuai for driving this, mini-batch Join is a very useful
>>> optimization, +1 for the general idea.
>>> 
>>> Regarding the configuration
>>> "table.exec.stream.join.mini-batch-enabled", I'm not sure it's really
>>> necessary. The semantic of changelog emitted by the Join operator is
>>> eventual consistency, so there is no much difference between original
>>> Join and mini-batch Join from this aspect. Besides, introducing more
>>> options would make it more complex for users, harder to understand and
>>> maintain, which we should be careful about.
>>> 
>>> One thing about the implementation, could you make the new operator
>>> share the same state definition with the original one?
>>> 
>>> shuai xu  于2024年1月10日周三 21:23写道:
>>>> 
>>>> Hi devs,
>>>> 
>>>> I’d like to start a discussion on FLIP-415: Introduce a new join operator 
>>>> to support minibatch[1].
>>>> 
>>>> Currently, when performing cascading connections in Flink, there is a pain 
>>>> point of record amplification. Every record join operator receives would 
>>>> trigger join process. However, if records of +I and -D matches , they 
>>>> could be folded to reduce two times of join process. Besides, records of  
>>>> -U +U might output 4 records in which two records are redundant when 
>>>> encountering outer join .
>>>> 
>>>> To address this issue, this FLIP introduces a new  
>>>> MiniBatchStreamingJoinOperator to achieve batch processing which could 
>>>> reduce number of outputting redundant messages and avoid unnecessary join 
>>>> processes.
>>>> A new option is added to control the operator to avoid influencing 
>>>> existing jobs.
>>>> 
>>>> Please find more details in the FLIP wiki document [1]. Looking
>>>> forward to your feedback.
>>>> 
>>>> [1]
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-415%3A+Introduce+a+new+join+operator+to+support+minibatch
>>>> 
>>>> Best,
>>>> Xu Shuai
>>> 
>>> 
>>> 
>>> --
>>> 
>>> Best,
>>> Benchao Li
>> 
> 
> 
> -- 
> 
> Best,
> Benchao Li

Best, 
Xu Shuai



Re: [DISCUSS] FLIP-415: Introduce a new join operator to support minibatch

2024-01-12 Thread shuai xu
Hi all. 

The point I want to highlight is that minibatch join could potentially yield 
incomplete changelog which existing jobs are not supposed to be. For example, 
the scenario that joins two CDC sources after de-duplicating them and the 
output would be used for audit analysis could not accept incomplete changelog. 
While the minibatch processing itself would not introduce any problem. 

The internal behavior of minibatch processing is not well-defined now. I don't 
think reusing the minibatch option for minibatch join is problematic, but 
precise control is necessary to mitigate the risk of generating incomplete 
changelog within minibatch. 

Controlling the behavior on changelog within minibatch should be a global 
option. Therefore, I propose introducing a new option 
'table.exec.mini-batch.compact-changes-enabled' to precisely control changelog 
compaction within minibatch. Then we deprecate the option 
'table.exec.deduplicate.mini-batch.compact-changes-enabled' . The deduplicate 
operator would fall back to follow the newly introduced option and the 
minibatch join would follow it as well. 


> 2024年1月12日 16:30,Jane Chan  写道:
> 
> Hi shuai,
> 
> Thanks for the update! Regarding the newly introduced configuration, I hold
> the same concern with Benchao and Xuyang.
> 
> First of all, in most cases, the fact that users choose to enable
> mini-batch configuration indicates they are aware of the trade-off between
> throughput and completeness of the changelog.
> And if we finally adopt this configuration solely to avoid state
> incompatibility, does it mean that we will need to introduce a new
> configuration for every future operator's mini-batch optimization, similar
> to what we did today?
> 
> Best,
> Jane
> 
> On Fri, Jan 12, 2024 at 1:45 PM Xuyang  wrote:
> 
>> Hi, Xu Shuai. Thanks for driving this flip.
>> 
>> 
>> The CDC message amplification of cascade join has always been a problem
>> for users. Judging from the
>> nexmark results, this optimization is very meaningful. I just have the
>> same doubts as Benchao, why can't we
>> use minibatch join as the default behavior when the user turns on
>> minibatch?
>> 
>> 
>>> Although the semantic of changelog emitted by the Join operator is
>> eventual consistency, the change might
>> not be supposed for the downstream of the job which requires details of
>> changelog.
>> 
>> 
>> I think if the user adds the minibatch options to his job to enable
>> minibatch, he should know that flink will reduce
>> the amount of data sent to downstream by folding CDC messages as much as
>> possible. In scenarios where all
>> details of CDC records need to be retained, such as just synchronizing
>> data with jobs from one db to another db,
>> users have no reason to enable minibatch.
>> 
>> 
>> The only scenario I can think of that requires adding this independent
>> minibatch join option is to ensure that the state
>> is compatible between multiple versions, but we have not promised users
>> state compatibility during cross-version upgrades.
>> 
>> 
>> Maybe we need to figure it out why does the
>> 'table.exec.deduplicate.mini-batch.compact-changes-enabled' option need to
>> be added to deduplicate operator? I think this is the same reason as
>> adding a separate parameter to join to control CDC message folding.
>> 
>> 
>> 
>> 
>> --
>> 
>>Best!
>>Xuyang
>> 
>> 
>> 
>> 
>> 
>> 在 2024-01-11 16:19:30,"Benchao Li"  写道:
>>>> the change might not be supposed for the downstream of the job which
>> requires details of changelog
>>> 
>>> Could you elaborate on this a bit? I've never met such kinds of
>>> requirements before, I'm curious what is the scenario that requires
>>> this.
>>> 
>>> shuai xu  于2024年1月11日周四 13:08写道:
>>>> 
>>>> Thanks for your response, Benchao.
>>>> 
>>>> Here is my thought on the newly added option.
>>>> Users' current jobs are running on a version without minibatch join. If
>> the existing option to enable minibatch join is utilized, then when users'
>> jobs are migrated to the new version, the internal behavior of the join
>> operation within the jobs will change. Although the semantic of changelog
>> emitted by the Join operator is eventual consistency, the change might not
>> be supposed for the downstream of the job which requires details of
>> changelog. This newly added option also refers to
>> 'table.exec.deduplicate.mini-b

[DISCUSS] Support notFollowedBy with interval as the last part of a Pattern

2020-02-11 Thread Shuai Xu
Hi all,
CEP is broadly used in more and more applications now. In may cases, users
need to use the pattern CEP.begin().notFollowedBy(). For example, they may
want to get the uses who created an oder but didn't pay in 10 minutes and
so on.

However, CEP doesn't support notFollowBy() as the last part of a pattern
now. So I propose to enable it as following:

If the pattern is ended with notFollowBy() with a time interval within(t),
we take it as a valid pattern. This pattern will be triggered after time t
from the begin stage if the previous pattern is matched and the
notFollowBy() pattern doesn't appear during the interval.

For example, Pattern.begin("start").where(event.getId() ==
1).notFollowBy("not").where(event.getId() == 2).within(Time.minutes(10)) is
a valid pattern now.
If the ids of the input events are 1, 3, 3..., then after 10 minutes from
getting event with id 1, it will get a match with 1.

This change will not add any new public interface, it only makes some
patterns not to be invalid any more.

The detail implement design is in:
https://docs.google.com/document/d/1swUSHcVxbkWm7EPdOfOQXWj-A4gGDA8Y8R1DOUjokds/edit#

Similar requirements from users can be found in:
https://issues.apache.org/jira/browse/FLINK-9431?filter=12347662

Please let me know if you have any questions or suggestions to improve this
proposal.


Re: [DISCUSS] Unified Core API for Streaming and Batch

2018-12-07 Thread Shuai Xu
Hi all
Glad to see the discussion, we are now designing to enhance the scheduling
of batch job, a unified api will help a lot.

Haibo Sun  于2018年12月5日周三 下午4:45写道:

> Hi all,
>
> Thank Kurt, you see more benefits of the unification than I do.
>
> I quite agree Kurt's views. DataStream, DataSet and Table are remained
> independent for now, and subsumed DataSet in data stream in the future. The
> collection execution mode is replaced by mini cluster. The high-level
> semantic APIs  have their own optimizations, but StreamTransformation does
> not.
>
> About iterations, I have not more ideas at the moment.
>
>
> Best,
> Haibo
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>


Re: [DISCUSS] Temporarily remove support for job rescaling via CLI action "modify"

2019-04-24 Thread Shuai Xu
Will we only remove command support in client side or the code in job
master will also be removed?

Till Rohrmann  于2019年4月24日周三 下午4:12写道:

> +1 for temporarily removing support for the modify command.
>
> Eventually, we have to add it again in order to support auto scaling. The
> next time we add it, we should address the known limitations.
>
> Cheers,
> Till
>
> On Wed, Apr 24, 2019 at 9:06 AM Paul Lam  wrote:
>
> > Hi Gary,
> >
> > + 1 to remove it for now. Actually some users are not aware of that it’s
> > still experimental, and ask quite a lot about the problem it causes.
> >
> > Best,
> > Paul Lam
> >
> > 在 2019年4月24日,14:49,Stephan Ewen  写道:
> >
> > Sounds reasonable to me. If it is a broken feature, then there is not
> much
> > value in it.
> >
> > On Tue, Apr 23, 2019 at 7:50 PM Gary Yao  wrote:
> >
> > Hi all,
> >
> > As the subject states, I am proposing to temporarily remove support for
> > changing the parallelism of a job via the following syntax [1]:
> >
> >./bin/flink modify [job-id] -p [new-parallelism]
> >
> > This is an experimental feature that we introduced with the first rollout
> > of
> > FLIP-6 (Flink 1.5). However, this feature comes with a few caveats:
> >
> >* Rescaling does not work with HA enabled [2]
> >* New parallelism is not persisted, i.e., after a JobManager restart,
> > the job
> >  will be recovered with the initial parallelism
> >
> > Due to the above-mentioned issues, I believe that currently nobody uses
> > "modify -p" to rescale their jobs in production. Moreover, the rescaling
> > feature stands in the way of our current efforts to rework Flink's
> > scheduling
> > [3]. I therefore propose to remove the rescaling code for the time being.
> > Note
> > that it will still be possible to change the parallelism by taking a
> > savepoint
> > and restoring the job with a different parallelism [4].
> >
> > Any comments and suggestions will be highly appreciated.
> >
> > Best,
> > Gary
> >
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html
> > [2] https://issues.apache.org/jira/browse/FLINK-8902
> > [3] https://issues.apache.org/jira/browse/FLINK-10429
> > [4]
> >
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/savepoints.html#what-happens-when-i-change-the-parallelism-of-my-program-when-restoring
> >
> >
> >
>


Re: [DISCUSS] FLIP-415: Introduce a new join operator to support minibatch

2024-01-15 Thread shuai xu
Hi Benchao,

Do you have any other questions about this issue?  Also, I would appreciate 
your thoughts on the proposal to introduce the new option 
'table.exec.mini-batch.compact-changes-enabled'. I’m looking forward your 
feedback.

> 2024年1月12日 15:01,shuai xu  写道:
> 
> Suppose we currently have a job that joins two CDC sources after 
> de-duplicating them and the output is available for audit analysis, and the 
> user turns off the parameter 
> "table.exec.deduplicate.mini-batch.compact-changes-enabled" to ensure that it 
> does not lose update details. If we don't introduce this parameter, after the 
> user upgrades the version, some update details may be lost due to the 
> mini-batch connection being enabled by default, resulting in distorted audit 
> results.
> 
>> 2024年1月11日 16:19,Benchao Li  写道:
>> 
>>> the change might not be supposed for the downstream of the job which 
>>> requires details of changelog
>> 
>> Could you elaborate on this a bit? I've never met such kinds of
>> requirements before, I'm curious what is the scenario that requires
>> this.
>> 
>> shuai xu  于2024年1月11日周四 13:08写道:
>>> 
>>> Thanks for your response, Benchao.
>>> 
>>> Here is my thought on the newly added option.
>>> Users' current jobs are running on a version without minibatch join. If the 
>>> existing option to enable minibatch join is utilized, then when users' jobs 
>>> are migrated to the new version, the internal behavior of the join 
>>> operation within the jobs will change. Although the semantic of changelog 
>>> emitted by the Join operator is eventual consistency, the change might not 
>>> be supposed for the downstream of the job which requires details of 
>>> changelog. This newly added option also refers to 
>>> 'table.exec.deduplicate.mini-batch.compact-changes-enabled'.
>>> 
>>> As for the implementation,The new operator shares the state of the original 
>>> operator and it merely has an additional minibatch for storing records to 
>>> do some optimization. The storage remains consistent, and there is minor 
>>> modification to the computational logic.
>>> 
>>> Best,
>>> Xu Shuai
>>> 
>>>> 2024年1月10日 22:56,Benchao Li  写道:
>>>> 
>>>> Thanks shuai for driving this, mini-batch Join is a very useful
>>>> optimization, +1 for the general idea.
>>>> 
>>>> Regarding the configuration
>>>> "table.exec.stream.join.mini-batch-enabled", I'm not sure it's really
>>>> necessary. The semantic of changelog emitted by the Join operator is
>>>> eventual consistency, so there is no much difference between original
>>>> Join and mini-batch Join from this aspect. Besides, introducing more
>>>> options would make it more complex for users, harder to understand and
>>>> maintain, which we should be careful about.
>>>> 
>>>> One thing about the implementation, could you make the new operator
>>>> share the same state definition with the original one?
>>>> 
>>>> shuai xu  于2024年1月10日周三 21:23写道:
>>>>> 
>>>>> Hi devs,
>>>>> 
>>>>> I’d like to start a discussion on FLIP-415: Introduce a new join operator 
>>>>> to support minibatch[1].
>>>>> 
>>>>> Currently, when performing cascading connections in Flink, there is a 
>>>>> pain point of record amplification. Every record join operator receives 
>>>>> would trigger join process. However, if records of +I and -D matches , 
>>>>> they could be folded to reduce two times of join process. Besides, 
>>>>> records of  -U +U might output 4 records in which two records are 
>>>>> redundant when encountering outer join .
>>>>> 
>>>>> To address this issue, this FLIP introduces a new  
>>>>> MiniBatchStreamingJoinOperator to achieve batch processing which could 
>>>>> reduce number of outputting redundant messages and avoid unnecessary join 
>>>>> processes.
>>>>> A new option is added to control the operator to avoid influencing 
>>>>> existing jobs.
>>>>> 
>>>>> Please find more details in the FLIP wiki document [1]. Looking
>>>>> forward to your feedback.
>>>>> 
>>>>> [1]
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-415%3A+Introduce+a+new+join+operator+to+support+minibatch
>>>>> 
>>>>> Best,
>>>>> Xu Shuai
>>>> 
>>>> 
>>>> 
>>>> --
>>>> 
>>>> Best,
>>>> Benchao Li
>>> 
>> 
>> 
>> -- 
>> 
>> Best,
>> Benchao Li
> 
> Best, 
> Xu Shuai


Best,
Xu Shuai

Re: [DISCUSS] FLIP-415: Introduce a new join operator to support minibatch

2024-01-17 Thread shuai xu
Hi Benchao,

I think your suggestion is very reasonable. For most users, having compaction 
enabled by default if mini-batch enabled is a more beneficial approach. 
However, I think this is an another thing we could discuss in the future about 
compaction within minibatch, which is an orthogonal topic to this discussion. 
Minibatch join itself would follow the option 'table.exec.mini-batch.enabled’, 
'table.exec.mini-batch.allow-latency' and 'table.exec.mini-batch.size'.

> 2024年1月16日 18:52,Benchao Li  写道:
> 
> shuai,
> 
> Thanks for the explanations, I understand the scenario you described
> now. IIUC, this will be a rather rare case that need to disable
> "compaction" when mini-batch is enabled, so I won't be against
> introducing it. However, I would suggest to enable the "compaction" by
> default (if mini-batch enabled), which will benefit most of use cases.
> For others that have special requirements about the changelog semantic
> (no compaction), they can disable compaction by themselves. WDYT?
> 
>> This is a relatively large optimization that may pose a significant
>> risk of bugs, so I like to keep it from being enabled by default for
>> now.
> @Jingsong has raised an interesting point that for large optimization
> or new features, we want to have an option for it and disable it by
> default in case of the risk of bugs. I agree with it, mostly.
> Currently there is no standard about whether a change is major or not,
> which means we may run into a situation debating whether a change is
> major or not. Anyway, it's an orthogonal topic to this discussion.
> 
> shuai xu  于2024年1月16日周二 13:14写道:
>> 
>> Hi Benchao,
>> 
>> Do you have any other questions about this issue?  Also, I would appreciate 
>> your thoughts on the proposal to introduce the new option 
>> 'table.exec.mini-batch.compact-changes-enabled'. I’m looking forward your 
>> feedback.
>> 
>>> 2024年1月12日 15:01,shuai xu  写道:
>>> 
>>> Suppose we currently have a job that joins two CDC sources after 
>>> de-duplicating them and the output is available for audit analysis, and the 
>>> user turns off the parameter 
>>> "table.exec.deduplicate.mini-batch.compact-changes-enabled" to ensure that 
>>> it does not lose update details. If we don't introduce this parameter, 
>>> after the user upgrades the version, some update details may be lost due to 
>>> the mini-batch connection being enabled by default, resulting in distorted 
>>> audit results.
>>> 
>>>> 2024年1月11日 16:19,Benchao Li  写道:
>>>> 
>>>>> the change might not be supposed for the downstream of the job which 
>>>>> requires details of changelog
>>>> 
>>>> Could you elaborate on this a bit? I've never met such kinds of
>>>> requirements before, I'm curious what is the scenario that requires
>>>> this.
>>>> 
>>>> shuai xu  于2024年1月11日周四 13:08写道:
>>>>> 
>>>>> Thanks for your response, Benchao.
>>>>> 
>>>>> Here is my thought on the newly added option.
>>>>> Users' current jobs are running on a version without minibatch join. If 
>>>>> the existing option to enable minibatch join is utilized, then when 
>>>>> users' jobs are migrated to the new version, the internal behavior of the 
>>>>> join operation within the jobs will change. Although the semantic of 
>>>>> changelog emitted by the Join operator is eventual consistency, the 
>>>>> change might not be supposed for the downstream of the job which requires 
>>>>> details of changelog. This newly added option also refers to 
>>>>> 'table.exec.deduplicate.mini-batch.compact-changes-enabled'.
>>>>> 
>>>>> As for the implementation,The new operator shares the state of the 
>>>>> original operator and it merely has an additional minibatch for storing 
>>>>> records to do some optimization. The storage remains consistent, and 
>>>>> there is minor modification to the computational logic.
>>>>> 
>>>>> Best,
>>>>> Xu Shuai
>>>>> 
>>>>>> 2024年1月10日 22:56,Benchao Li  写道:
>>>>>> 
>>>>>> Thanks shuai for driving this, mini-batch Join is a very useful
>>>>>> optimization, +1 for the general idea.
>>>>>> 
>>>>>> Regarding the configuration
>>>>>> "table.exec.st

Re: [DISCUSS] FLIP-415: Introduce a new join operator to support minibatch

2024-01-17 Thread shuai xu
Hi all,

Thank you for the valuable input. 

Based on the current discussion, the minibatch join is prepared to follow the 
existing three options of 'table.exec.mini-batch.enabled’, 
'table.exec.mini-batch.allow-latency' and 'table.exec.mini-batch.size’. As for 
the compaction within the minibatch that was mentioned in the discussion, it 
could be discussed in a future FLIP.

Do any of you have further questions regarding this FLIP? If there are no more 
comments, I would like to open a voting thread at 12 a.m. UTC+8 on January 
19th. 
> 2024年1月10日 21:23,shuai xu  写道:
> 
> Hi devs,
> 
> I’d like to start a discussion on FLIP-415: Introduce a new join operator to 
> support minibatch[1].
> 
> Currently, when performing cascading connections in Flink, there is a pain 
> point of record amplification. Every record join operator receives would 
> trigger join process. However, if records of +I and -D matches , they could 
> be folded to reduce two times of join process. Besides, records of  -U +U 
> might output 4 records in which two records are redundant when encountering 
> outer join . 
> 
> To address this issue, this FLIP introduces a new  
> MiniBatchStreamingJoinOperator to achieve batch processing which could reduce 
> number of outputting redundant messages and avoid unnecessary join processes. 
> A new option is added to control the operator to avoid influencing existing 
> jobs.
> 
> Please find more details in the FLIP wiki document [1]. Looking
> forward to your feedback.
> 
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-415%3A+Introduce+a+new+join+operator+to+support+minibatch
> 
> Best,
> Xu Shuai

Best,
Xu Shuai

[VOTE] FLIP-415: Introduce a new join operator to support minibatch

2024-01-18 Thread shuai xu
Dear Flink Developers,

Thank you for providing feedback on FLIP-415: Introduce a new join operator to 
support minibatch[1]. I'd like to start a vote on this FLIP. Here is the 
discussion thread[2]. 

After the discussion, this FLIP will not introduce any new Option. The 
minibatch join will default to compacting the changelog. As for the option to 
control compaction within the minibatch that was mentioned in the discussion, 
it could be discussed in a future FLIP.

The vote will be open for at least 72 hours unless there is an objection or
insufficient votes.

Best,
Xu Shuai

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-415%3A+Introduce+a+new+join+operator+to+support+minibatch
[2]
https://lists.apache.org/thread/pd9gzslq20dtzzfphxqvwhc43hrzo2y1


[RESULT][VOTE] FLIP-415: Introduce a new join operator to support minibatch

2024-01-23 Thread shuai xu
Hi devs,

I'm glad to announce that the FLIP-415[1]  has been accepted. The voting
thread is here[2].

The proposal received 5 approving votes, 4 of which are binding:
- Lincoln Lee (binding)
- Ron liu (binding)
- Jane Chan (binding)
- Benchao Li (binding)
- Xuyang (non-binding)

And there is no disapproving one.

Thanks to all participants for discussion and voting!

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-415%3A+Introduce+a+new+join+operator+to+support+minibatch
[2]
https://lists.apache.org/thread/xc0c4zsq4qlyot4vcbdxznr9ocnd0pm0

Best,
Xu Shuai


[RESULT][VOTE] FLIP-415: Introduce a new join operator to support minibatch

2024-01-25 Thread shuai xu
Hi devs,

I'm glad to announce that the FLIP-415[1] has been accepted. The voting
thread is here[2].

The proposal received 5 approving votes, all of which are as follow:
- Lincoln Lee (binding)
- liu ron (binding)
- Jane Chan (binding)
- Benchao Li (binding)
- Xuyang (non-binding)

And there is no disapproving one.

Thanks to all participants for discussion and voting!

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-415%3A+Introduce+a+new+join+operator+to+support+minibatch
[2]
https://lists.apache.org/thread/xc0c4zsq4qlyot4vcbdxznr9ocnd0pm0

[RESULT][VOTE] FLIP-415: Introduce a new join operator to support minibatch

2024-01-25 Thread shuai xu
Hi devs,

I'm glad to announce that the FLIP-415[1]  has been accepted. The voting
thread is here[2].

The proposal received five approving votes, four of which are binding:
- Lincoln Lee (binding)
- Jane Chan (binding)
- Ron liu (binding)
- Benchao Li (binding)
- Xuyang (non-binding)


And there is no disapproving one.

Thanks to all participants for discussion and voting!

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-415%3A+Introduce+a+new+join+operator+to+support+minibatch
[2] https://lists.apache.org/thread/xc0c4zsq4qlyot4vcbdxznr9ocnd0pm0

Re: [SUMMARY] Flink 1.19 Release Sync 01/23/2024

2024-01-25 Thread shuai xu
Hi everyone,

I am currently working on FLIP-415[1], which aims to support minibatch
join. This will bring higher performance for regular joins.
There is still a task to be merged which has undergone multiple rounds
of review. I expect it could be merged on Jan 26th.

Therefore, I am +1 with Feng Jin to delay the deadline.

[1]https://cwiki.apache.org/confluence/display/FLINK/FLIP-415%3A+Introduce+a+new+join+operator+to+support+minibatch

Best regards,
Xu Shuai


Lincoln Lee  于2024年1月24日周三 22:00写道:
>
> Hi devs,
>
> I'd like to share some highlights from the release sync on 01/23/2024
>
>
> *- Feature freeze*  *We plan to freeze the feature on Jan 26th. If there's
> specific need for an extension, please confirm with RMs by replying this
> mail.*
>
>
> *- Features & issues tracking*  So far we've had 15 flips been marked
> done(some documentation is still in progress), we also ask responsible
> contributors to help update the status of the remaining items on the 1.19
> wiki page [1], including *documentation and cross-team testing requirements*,
> this will help the release process.
>
>
> *- Blockers*  There're performance regression and blocker issues are being
> worked on:
>   https://issues.apache.org/jira/browse/FLINK-34148
>   https://issues.apache.org/jira/browse/FLINK-34007
>   https://issues.apache.org/jira/browse/FLINK-34225
>   Note that test instabilities will be upgraded to blocker if it is newly
> introduced.
>
> *- Sync meeting* (https://meet.google.com/vcx-arzs-trv)
>   The next release sync is *Jan 30th, 2024*. We'll switch to weekly release
> sync.
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/1.19+Release
>
> Best,
> Yun, Jing, Martijn and Lincoln


Re: Support minibatch for TopNFunction

2024-03-27 Thread shuai xu
Hi, Roman

Thanks for your proposal. I think this is an interesting idea and it might be 
useful when there are operators downstream of the TopN.
And I have some questions about your proposal after reading your doc.

1.  From the input-output perspective, only the accumulated data seems to be 
sent. If the accumulated data  +recordA has already been sent in the previous 
batch, the -recordA would be sent in this batch? Could you provide a detailed 
rule about folding redundant records? 

2. The Minibatch Join[1] reduces state access during join process because it 
folds redundant records before entering the process. From your doc, folding 
redundant records is implemented after the TopN process. In other words, it 
does not reduce the pressure of state access on TopN itself, but rather just 
folds the output results that could be redundant. Is it right?

3. For the optimization results, the metric of output rows may not be 
persuasive. Could you offer a result with metric in nexmark?

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-regular-joins

Best,
Xushuai
> 2024年3月26日 00:00,Roman Boyko  写道:
> 
> Hi Ron,
> Thank you so much for your reply!
> 
> 1. I added the description to Motivation part of my document [1]
> 2. I suppose to inject this functionality to AbstractTopNFunction, thus it
> will work for all its implementations. It doesn't depend of implementation
> (either it would be AppendOnlyTopNFunction or RetractableTopNFunction,
> except maybe FastTop1Function), the most effect it would have for functions
> with:
> - topN functions without no-ranking optimization [2]
> - high value of N (top1 has less possibilities for optimization here)
> - frequent input records which are placed to the top 1 position
> 3. I will do it in a week - I need to fix and recheck some parts
> 4. Unfortunately I don't have permissions to Flink confluence and according
> to contribution guide I first expressed my idea as google doc. I would be
> happy to transform this idea to FLIP.
> 
> [1]
> https://docs.google.com/document/d/1YPHwxKfiGSUOUOa6bc68fIJHO_UojTwZEC29VVEa-Uk
> 
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/topn/#no-ranking-output-optimization
> 
> --
> Best regards,
> Roman Boyko
> e.: ro.v.bo...@gmail.com
> m.: +79059592443
> telegram: @rboyko
> 
> On Mon, 25 Mar 2024 at 15:12, Ron liu  wrote:
> 
>> Hi, Roman
>> 
>> Thanks for your proposal, I intuitively feel that this optimization would
>> be very useful to reduce the amount of message amplification for TopN
>> operators. After briefly looking at your google docs, I have the following
>> questions:
>> 
>> 1. Whether you can describe in detail the principle of solving the TopN
>> operator record amplification, similar to Minibatch Join[1], through the
>> figure of current Motivation part, I can not understand how you did it
>> 2. TopN has currently multiple implementation functions, including
>> AppendOnlyFirstNFunction, AppendOnlyTopNFunction, FastTop1Function,
>> RetractableTopNFunction, UpdatableTopNFunction. Is it possible to elaborate
>> on which patterns the Minibatch optimization applies to?
>> 3. Is it possible to provide the PoC code?
>> 4. finally, we need a formal FLIP document on the wiki[2].
>> 
>> [1]
>> 
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-regular-joins
>> [2]
>> 
>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>> 
>> Best,
>> Ron
>> 
>> Roman Boyko  于2024年3月24日周日 01:14写道:
>> 
>>> Hi Flink Community,
>>> 
>>> I tried to describe my idea about minibatch for TopNFunction in this doc
>> -
>>> 
>>> 
>> https://docs.google.com/document/d/1YPHwxKfiGSUOUOa6bc68fIJHO_UojTwZEC29VVEa-Uk/edit?usp=sharing
>>> 
>>> Looking forward to your feedback, thank you
>>> 
>>> On Tue, 19 Mar 2024 at 12:24, Roman Boyko  wrote:
>>> 
 Hello Flink Community,
 
 The same problem with record amplification as described in FLIP-415:
>>> Introduce
 a new join operator to support minibatch[1] exists for most of
 implementations of AbstractTopNFunction. Especially when the rank is
 provided to output. For example, when calculating Top100 with rank
>>> output,
 every input record might produce 100 -U records and 100 +U records.
 
 According to my POC (which is similar to FLIP-415) the record
 amplification could be significantly reduced by using input or output
 buffer.
 
 What do you think if we implement such optimization for TopNFunctions?
 
 [1]
 https://cwiki.apache.org/confluence/display/FLINK/FLIP-415
 %3A+Introduce+a+new+join+operator+to+support+minibatch
 
 --
 Best regards,
 Roman Boyko
 e.: ro.v.bo...@gmail.com
 m.: +79059592443
 telegram: @rboyko
 
>>> 
>>> 
>>> --
>>> Best regards,
>>> Roman B

Re: [VOTE]FLIP-480: Support to deploy script in application mode

2024-11-12 Thread shuai xu
+1(not-binding)

Best,
Xushuai

> 2024年11月13日 10:48,Zhu Zhu  写道:
> 
> +1 (binding)
> 
> Thanks,
> Zhu
> 
> Ferenc Csaky  于2024年11月13日周三 02:37写道:
> 
>> +1 (binding)
>> 
>> Best,
>> Ferenc
>> 
>> 
>> 
>> 
>> On Tuesday, November 12th, 2024 at 03:28, Xuyang 
>> wrote:
>> 
>>> 
>>> 
>>> +1 (non-binding)
>>> 
>>> 
>>> --
>>> 
>>> Best!
>>> Xuyang
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 在 2024-11-12 10:02:27,"Ron Liu" ron9@gmail.com 写道:
>>> 
 +1(binding)
 
 Best,
 Ron
 
 Gyula Fóra gyula.f...@gmail.com 于2024年11月12日周二 01:27写道:
 
> + 1(binding)
> 
> Thanks for answering my concerns/questions.
> 
> Gyula
> 
> On Fri, Nov 8, 2024 at 11:16 AM Gyula Fóra gyula.f...@gmail.com
>> wrote:
> 
>> Hey!
>> 
>> Sorry, bit late to the party, I have added a concern to the
>> discussion
>> related to the gateway submission vote.
>> 
>> I would like to clarify that before we close this vote.
>> 
>> Cheers,
>> Gyula
>> 
>> On Fri, Nov 8, 2024 at 10:57 AM Feng Jin jinfeng1...@gmail.com
>> wrote:
>> 
>>> +1 (non-binding)
>>> 
>>> Best,
>>> Feng Jin
>>> 
>>> On Fri, Nov 8, 2024 at 5:37 PM yuanfeng hu yuanf...@apache.org
>> wrote:
>>> 
 +1(no-binding)
 
 Shengkai Fang fskm...@gmail.com 于2024年11月8日周五 15:11写道:
 
> Hi everyone,
> 
> I'd like to start a vote on FLIP-480: Support to deploy
>> script in
> application mode[1]. The discussion can be found here[2].
> 
> The vote will be open for at least 72 hours unless there are
>> any
> objections
> or insufficient votes.
> 
> Best,
> Shengkai
> 
> [1]
> 
> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-480%3A+Support+to+deploy+SQL+script+in+application+mode
> 
> [2]
> 
>> https://lists.apache.org/thread/g3ohzbogww1g8zl7zlmn84fsk29qr568
 
 --
 Best,
 Yuanfeng
>> 



[DISCUSS] FLIP-519: Introduce async lookup key ordered mode

2025-04-07 Thread shuai xu
Hi devs,

I'd like to start a discussion on FLIP-519: Introduce async lookup key
ordered mode[1].

The Flink system currently supports both record-level ordered and
unordered output modes for asynchronous lookup joins. However, it does
not guarantee the processing order of records sharing the same key.

As highlighted in [2], there are two key requirements for enhancing
async io operations:
1. Ensuring the processing order of records with the same key is a
common requirement in DataStream.
2. Sequential processing of records sharing the same upsertKey when
performing lookup join in Flink SQL is essential for maintaining
correctness.

This optimization aims to balance correctness and performance for
stateful streaming workloads.Then the FLIP introduce a new operator
KeyedAsyncWaitOperator to supports the optimization. Besides, a new
option is added to control the behaviour avoid influencing existing
jobs.

please find more details in the FLIP wiki document[1]. Looking forward
to your feedback.

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-519%3A++Introduce+async+lookup+key+ordered+mode
[2] https://lists.apache.org/thread/wczzjhw8g0jcbs8lw2jhtrkw858cmx5n

Best,
Xu Shuai


Re: [ANNOUNCE] New Apache Flink Committer - Xiqian Yu

2025-04-16 Thread shuai xu
Congratulations! 

Best, 
Xu Shuai

> 2025年4月17日 13:54,Lincoln Lee  写道:
> 
> Congratulations, Xiqian!
> 
> 
> Best,
> Lincoln Lee
> 
> 
> Shengkai Fang  于2025年4月17日周四 11:51写道:
> 
>> Congratulations!
>> 
>> Best
>> Shengkai
>> 
>> Hongshun Wang  于2025年4月17日周四 10:39写道:
>> 
>>> Congratulations!
>>> 
>>> Best
>>> Hongshun
>>> 
 2025年4月16日 15:25,Leonard Xu  写道:
 
 becoming
>>> 
>>> 
>> 



Re: [ANNOUNCE] New Apache Flink Committer - Yanquan Lv

2025-04-17 Thread shuai xu
Congratulations, Yanquan!

Best,
Xu Shuai

> 2025年4月17日 14:27,Xuyang  写道:
> 
> Congratulations, Yanquan!



Re: [DISCUSS] FLIP-519: Introduce async lookup key ordered mode

2025-04-17 Thread shuai xu
Hi Xuyang,


Thanks for your response. Actually, ALLOW_UNORDERED is only enabled when facing 

append-only streams for higher throughput. KEY_ORDERED is designed for 
scenarios 

where upsert key exists. Its goal is to achieve higher performance compared to 
the ORDERED

mode while ensuring correctness is not compromised. In a word, ALLOW_UNORDERED 

mode does works in the presence of an upsert key.


Aside from the literal difference in sequence, KEY_ORDER focuses on the 
processing order,
 
while ALLOW_UNORDERED pertains to the output order. In ALLOW_UNORDERED mode, 

the processing order may be either ordered or unordered.


Best,
Xu Shuai

> 2025年4月17日 14:53,Xuyang  写道:
> 
> Hi, Shuai.
> 
> This is a valuable addition to the current AsyncLookupJoin, and I’m 
> 
> generally in favor of it. 
> 
> 
> 
> 
> I have one question. Why do we need to introduce additional parameters 
> 
> to control KEY_ORDERED and ALLOW_UNORDERED? In other words, 
> 
> what scenarios require allowing users to perform completely unordered 
> 
> async lookup joins in the presence of an upsert key?
> 
> 
> 
> 
> --
> 
>Best!
>Xuyang
> 
> 
> 
> 
> 
> 在 2025-04-11 10:39:46,"shuai xu"  写道:
>> Hi all,
>> 
>> This FLIP will primarily focus on the implementation within the table 
>> module. As for support in the DataStream API, it will be addressed in a 
>> separate FLIP.
>> 
>>> 2025年4月8日 09:57,shuai xu  写道:
>>> 
>>> Hi devs,
>>> 
>>> I'd like to start a discussion on FLIP-519: Introduce async lookup key
>>> ordered mode[1].
>>> 
>>> The Flink system currently supports both record-level ordered and
>>> unordered output modes for asynchronous lookup joins. However, it does
>>> not guarantee the processing order of records sharing the same key.
>>> 
>>> As highlighted in [2], there are two key requirements for enhancing
>>> async io operations:
>>> 1. Ensuring the processing order of records with the same key is a
>>> common requirement in DataStream.
>>> 2. Sequential processing of records sharing the same upsertKey when
>>> performing lookup join in Flink SQL is essential for maintaining
>>> correctness.
>>> 
>>> This optimization aims to balance correctness and performance for
>>> stateful streaming workloads.Then the FLIP introduce a new operator
>>> KeyedAsyncWaitOperator to supports the optimization. Besides, a new
>>> option is added to control the behaviour avoid influencing existing
>>> jobs.
>>> 
>>> please find more details in the FLIP wiki document[1]. Looking forward
>>> to your feedback.
>>> 
>>> [1] 
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-519%3A++Introduce+async+lookup+key+ordered+mode
>>> [2] https://lists.apache.org/thread/wczzjhw8g0jcbs8lw2jhtrkw858cmx5n
>>> 
>>> Best,
>>> Xu Shuai



Re: [DISCUSS] FLIP-519: Introduce async lookup key ordered mode

2025-04-10 Thread shuai xu
Hi Zakelly,

Thank you for your response and for taking responsibility for generalizing the 
functionality of the 'Asynchronous Execution Model' (AEC). As we discussed 
earlier, the subsequent work on FLIP will be based on operators that support 
AEC. If you need any further discussion, please feel free to reach out to me 
directly.

> 2025年4月10日 12:31,Zakelly Lan  写道:
> 
> Hi all,
> 
> I have also added a 'Follow up' section at the end of the FLIP-425[1]
> describing this.
> 
> [1] https://cwiki.apache.org/confluence/x/S4p3EQ
> 
> 
> Best,
> Zakelly
> 
> On Wed, Apr 9, 2025 at 12:42 PM Zakelly Lan  wrote:
> 
>> Thanks for driving this!
>> 
>> +1 for the FLIP given there is a solid user case behind.
>> 
>> Shuai and I had a discussion and we agree that `KeyedAsyncWaitOperator`
>> in current FLIP shares similar functionality with the `Asynchronous
>> Execution Model (AEC)` introduced in FLIP-425[1]. We think it is better to
>> generalize the AEC for all keyed ordered cases, not only for state access.
>> So I'd make this happen after the approval of this FLIP. Hope this helps
>> all the similar operators to implement.
>> 
>> [1] https://cwiki.apache.org/confluence/x/S4p3EQ
>> 
>> 
>> Best,
>> Zakelly
>> 
>> On Tue, Apr 8, 2025 at 10:00 AM shuai xu  wrote:
>> 
>>> Hi devs,
>>> 
>>> I'd like to start a discussion on FLIP-519: Introduce async lookup key
>>> ordered mode[1].
>>> 
>>> The Flink system currently supports both record-level ordered and
>>> unordered output modes for asynchronous lookup joins. However, it does
>>> not guarantee the processing order of records sharing the same key.
>>> 
>>> As highlighted in [2], there are two key requirements for enhancing
>>> async io operations:
>>> 1. Ensuring the processing order of records with the same key is a
>>> common requirement in DataStream.
>>> 2. Sequential processing of records sharing the same upsertKey when
>>> performing lookup join in Flink SQL is essential for maintaining
>>> correctness.
>>> 
>>> This optimization aims to balance correctness and performance for
>>> stateful streaming workloads.Then the FLIP introduce a new operator
>>> KeyedAsyncWaitOperator to supports the optimization. Besides, a new
>>> option is added to control the behaviour avoid influencing existing
>>> jobs.
>>> 
>>> please find more details in the FLIP wiki document[1]. Looking forward
>>> to your feedback.
>>> 
>>> [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-519%3A++Introduce+async+lookup+key+ordered+mode
>>> [2] https://lists.apache.org/thread/wczzjhw8g0jcbs8lw2jhtrkw858cmx5n
>>> 
>>> Best,
>>> Xu Shuai
>>> 
>> 



Re: [DISCUSS] FLIP-519: Introduce async lookup key ordered mode

2025-04-10 Thread shuai xu
Hi all,

This FLIP will primarily focus on the implementation within the table module. 
As for support in the DataStream API, it will be addressed in a separate FLIP.

> 2025年4月8日 09:57,shuai xu  写道:
> 
> Hi devs,
> 
> I'd like to start a discussion on FLIP-519: Introduce async lookup key
> ordered mode[1].
> 
> The Flink system currently supports both record-level ordered and
> unordered output modes for asynchronous lookup joins. However, it does
> not guarantee the processing order of records sharing the same key.
> 
> As highlighted in [2], there are two key requirements for enhancing
> async io operations:
> 1. Ensuring the processing order of records with the same key is a
> common requirement in DataStream.
> 2. Sequential processing of records sharing the same upsertKey when
> performing lookup join in Flink SQL is essential for maintaining
> correctness.
> 
> This optimization aims to balance correctness and performance for
> stateful streaming workloads.Then the FLIP introduce a new operator
> KeyedAsyncWaitOperator to supports the optimization. Besides, a new
> option is added to control the behaviour avoid influencing existing
> jobs.
> 
> please find more details in the FLIP wiki document[1]. Looking forward
> to your feedback.
> 
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-519%3A++Introduce+async+lookup+key+ordered+mode
> [2] https://lists.apache.org/thread/wczzjhw8g0jcbs8lw2jhtrkw858cmx5n
> 
> Best,
> Xu Shuai



Re: [DISCUSS] FLIP-519: Introduce async lookup key ordered mode

2025-05-06 Thread shuai xu
Hi, devs, 


There haven’t been any further responses to this email over the past few days. 
I'd like to initiate a vote on the current proposal[1] in the next few days. If 
there are any concerns about this FLIP-519[1], I will gladly pause and make 
necessary adjustments. 

Best, 
Xu Shuai

[1]https://cwiki.apache.org/confluence/display/FLINK/FLIP-519%3A++Introduce+async+lookup+key+ordered+mode

> 2025年4月8日 09:57,shuai xu  写道:
> 
> Hi devs,
> 
> I'd like to start a discussion on FLIP-519: Introduce async lookup key
> ordered mode[1].
> 
> The Flink system currently supports both record-level ordered and
> unordered output modes for asynchronous lookup joins. However, it does
> not guarantee the processing order of records sharing the same key.
> 
> As highlighted in [2], there are two key requirements for enhancing
> async io operations:
> 1. Ensuring the processing order of records with the same key is a
> common requirement in DataStream.
> 2. Sequential processing of records sharing the same upsertKey when
> performing lookup join in Flink SQL is essential for maintaining
> correctness.
> 
> This optimization aims to balance correctness and performance for
> stateful streaming workloads.Then the FLIP introduce a new operator
> KeyedAsyncWaitOperator to supports the optimization. Besides, a new
> option is added to control the behaviour avoid influencing existing
> jobs.
> 
> please find more details in the FLIP wiki document[1]. Looking forward
> to your feedback.
> 
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-519%3A++Introduce+async+lookup+key+ordered+mode
> [2] https://lists.apache.org/thread/wczzjhw8g0jcbs8lw2jhtrkw858cmx5n
> 
> Best,
> Xu Shuai



Re: [DISCUSS] FLIP-519: Introduce async lookup key ordered mode

2025-04-27 Thread shuai xu
Hi Ron,

Thank you so much for your feedback and support on this proposal! I really 
appreciate you taking the time to review

 it in detail.

You're absolutely right about the table's content. When 
table.exec.async-lookup.key-ordered-enabled=true and 

table.exec.async-lookup.output-mode=ORDERED, the lookup join in key order 
should indeed be marked as "yes” 

instead of "no". The current representation is a bit misleading and could cause 
confusion.


I'll update the FLIP to correct this point and ensure it accurately reflects 
the behavior.

Best,
Shuai

> 2025年4月25日 10:23,Ron Liu  写道:
> 
> Hi, Shuai
> 
> Thanks for driving this proposal. The FLIP looks good to me overall, +1 for
> it.
> 
> I have a small question about the table's contents. When
> 'table.exec.async-lookup.key-ordered-enabled=true' and
> 'table.exec.async-lookup.output-mode=ORDERD', the lookup join in key
> ordered should be 'yes', not 'no'.
> ORDERD is naturally guaranteed to be ordered, it just doesn't depend on the
> current implementation of this proposal, which is a bit confusing here.
> 
> 
> Best,
> Ron
> 
> shuai xu  于2025年4月18日周五 10:50写道:
> 
>> Hi Xuyang,
>> 
>> 
>> Thanks for your response. Actually, ALLOW_UNORDERED is only enabled when
>> facing
>> 
>> append-only streams for higher throughput. KEY_ORDERED is designed for
>> scenarios
>> 
>> where upsert key exists. Its goal is to achieve higher performance
>> compared to the ORDERED
>> 
>> mode while ensuring correctness is not compromised. In a word,
>> ALLOW_UNORDERED
>> 
>> mode does works in the presence of an upsert key.
>> 
>> 
>> Aside from the literal difference in sequence, KEY_ORDER focuses on the
>> processing order,
>> 
>> while ALLOW_UNORDERED pertains to the output order. In ALLOW_UNORDERED
>> mode,
>> 
>> the processing order may be either ordered or unordered.
>> 
>> 
>> Best,
>> Xu Shuai
>> 
>>> 2025年4月17日 14:53,Xuyang  写道:
>>> 
>>> Hi, Shuai.
>>> 
>>> This is a valuable addition to the current AsyncLookupJoin, and I’m
>>> 
>>> generally in favor of it.
>>> 
>>> 
>>> 
>>> 
>>> I have one question. Why do we need to introduce additional parameters
>>> 
>>> to control KEY_ORDERED and ALLOW_UNORDERED? In other words,
>>> 
>>> what scenarios require allowing users to perform completely unordered
>>> 
>>> async lookup joins in the presence of an upsert key?
>>> 
>>> 
>>> 
>>> 
>>> --
>>> 
>>>   Best!
>>>   Xuyang
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 在 2025-04-11 10:39:46,"shuai xu"  写道:
>>>> Hi all,
>>>> 
>>>> This FLIP will primarily focus on the implementation within the table
>> module. As for support in the DataStream API, it will be addressed in a
>> separate FLIP.
>>>> 
>>>>> 2025年4月8日 09:57,shuai xu  写道:
>>>>> 
>>>>> Hi devs,
>>>>> 
>>>>> I'd like to start a discussion on FLIP-519: Introduce async lookup key
>>>>> ordered mode[1].
>>>>> 
>>>>> The Flink system currently supports both record-level ordered and
>>>>> unordered output modes for asynchronous lookup joins. However, it does
>>>>> not guarantee the processing order of records sharing the same key.
>>>>> 
>>>>> As highlighted in [2], there are two key requirements for enhancing
>>>>> async io operations:
>>>>> 1. Ensuring the processing order of records with the same key is a
>>>>> common requirement in DataStream.
>>>>> 2. Sequential processing of records sharing the same upsertKey when
>>>>> performing lookup join in Flink SQL is essential for maintaining
>>>>> correctness.
>>>>> 
>>>>> This optimization aims to balance correctness and performance for
>>>>> stateful streaming workloads.Then the FLIP introduce a new operator
>>>>> KeyedAsyncWaitOperator to supports the optimization. Besides, a new
>>>>> option is added to control the behaviour avoid influencing existing
>>>>> jobs.
>>>>> 
>>>>> please find more details in the FLIP wiki document[1]. Looking forward
>>>>> to your feedback.
>>>>> 
>>>>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-519%3A++Introduce+async+lookup+key+ordered+mode
>>>>> [2] https://lists.apache.org/thread/wczzjhw8g0jcbs8lw2jhtrkw858cmx5n
>>>>> 
>>>>> Best,
>>>>> Xu Shuai
>> 
>> 



[VOTE]FLIP-519: Introduce async lookup key ordered mode

2025-05-09 Thread shuai xu
Hi devs,

Thank you for providing feedback on FLIP-519: Introduce async lookup key 
ordered mode[1].
 I'd like to start a vote on this FLIP. Here is the discussion thread[2]. 


The vote will be open for at least 72 hours unless there is an objection or 
insufficient votes.

Best,
Xu Shuai

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-519%3A++Introduce+async+lookup+key+ordered+mode
[2] https://lists.apache.org/thread/z3th724l6vnylgv601gvcbdy4oy2wy7r

[RESULT][VOTE] FLIP-519: Introduce async lookup key ordered mode

2025-05-18 Thread shuai xu
Hi devs,

I'm glad to share with you that FLIP-519: Introduce async lookup key ordered 
mode[1]
has been approved with 4 approving votes (4 binding) [2].

Lincoln Lee (binding)
Ron Liu (binding)
Zakelly Lan (binding)
Xuyang (binding)

Thanks to everyone who participated in the discussion and voted.

Best regards,

Xu Shuai

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-519%3A++Introduce+async+lookup+key+ordered+mode
[2] https://lists.apache.org/thread/q1rksclrjllpqjvkklfqgp5dx3j2t38w

[jira] [Created] (FLINK-32219) sql client would be pending after executing plan of inserting

2023-05-30 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-32219:


 Summary: sql client would be pending after executing plan of 
inserting
 Key: FLINK-32219
 URL: https://issues.apache.org/jira/browse/FLINK-32219
 Project: Flink
  Issue Type: Bug
Reporter: Shuai Xu


I compiled plan for insert statement firstly and then I execute the plan. 
However the sql client is pending after running execute plan statement. Here is 
the part of stacktrace:

{code:java}
"pool-2-thread-1" #30 prio=5 os_prio=31 tid=0x0001172e5000 nid=0x6d03 
waiting on condition [0x000173e01000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00076e72af20> (a 
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at 
org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83)
at 
org.apache.flink.table.api.internal.InsertResultProvider.access$200(InsertResultProvider.java:37)
at 
org.apache.flink.table.api.internal.InsertResultProvider$Iterator.hasNext(InsertResultProvider.java:106)
at java.util.Iterator.forEachRemaining(Iterator.java:115)
at 
org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:115)
at 
org.apache.flink.table.gateway.service.result.ResultFetcher.fromTableResult(ResultFetcher.java:163)
at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.callOperation(OperationExecutor.java:542)
at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:440)
at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195)
at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl$$Lambda$389/1391083077.apply(Unknown
 Source)
at 
org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$$Lambda$390/208625838.call(Unknown
 Source)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation$$Lambda$392/670621032.run(Unknown
 Source)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33689) jsonObjectAggFunction can't retract previous data which is invalid when enable local global agg

2023-11-28 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-33689:


 Summary: jsonObjectAggFunction can't retract previous data which 
is invalid when enable local global agg
 Key: FLINK-33689
 URL: https://issues.apache.org/jira/browse/FLINK-33689
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.18.0
Reporter: Shuai Xu


Run the test as following and enable LocalGlobal and minibatch  in 
sql/AggregateITCase . 
{code:java}
//代码占位符
def testGroupJsonObjectAggWithRetract(): Unit = {
  val data = new mutable.MutableList[(Long, String, Long)]
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  data.+=((2L, "Hallo", 2L))
  val sql =
s"""
   |select
   |   JSON_OBJECTAGG(key k value v)
   |from (select
   | cast(SUM(a) as string) as k,t as v
   |   from
   | Table6
   |   group by t)
   |""".stripMargin
  val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't)
  tEnv.createTemporaryView("Table6", t)
  val sink = new TestingRetractSink
  tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
  env.execute()
  val expected =
List(
  "{\"30\":2}"
)
  assertThat(sink.getRetractResults).isEqualTo(expected)
} {code}
The result is as following.
{code:java}
//代码占位符
List({"14":2,"30":2}) {code}
However,  \{"14":2}  should be retracted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34219) Introduce a new join operator to support minibatch

2024-01-23 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-34219:


 Summary: Introduce a new join operator to support minibatch
 Key: FLINK-34219
 URL: https://issues.apache.org/jira/browse/FLINK-34219
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Runtime
Reporter: Shuai Xu
 Fix For: 1.19.0


This is the parent task of FLIP-415.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34220) introduce buffer bundle for minibatch join

2024-01-23 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-34220:


 Summary: introduce buffer bundle for minibatch join
 Key: FLINK-34220
 URL: https://issues.apache.org/jira/browse/FLINK-34220
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Reporter: Shuai Xu
 Fix For: 1.19.0


introduce buffer bundle for storing records to implement minibatch join



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34221) Introduce operator for minibatch join

2024-01-23 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-34221:


 Summary: Introduce operator for minibatch join
 Key: FLINK-34221
 URL: https://issues.apache.org/jira/browse/FLINK-34221
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Reporter: Shuai Xu
 Fix For: 1.19.0


Introduce operator that implements minibatch join



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34222) Get minibatch join operator involved

2024-01-23 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-34222:


 Summary: Get minibatch join operator involved
 Key: FLINK-34222
 URL: https://issues.apache.org/jira/browse/FLINK-34222
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Reporter: Shuai Xu
 Fix For: 1.19.0


Get minibatch join operator involved which includes both plan and operator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34256) Add a documentation section for minibatch join

2024-01-28 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-34256:


 Summary: Add a documentation section for minibatch join
 Key: FLINK-34256
 URL: https://issues.apache.org/jira/browse/FLINK-34256
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Affects Versions: 1.19.0
Reporter: Shuai Xu


We should add a minibatch join section in Performance Tuning to explain the 
usage and principle of minibatch-join.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34349) Release Testing: Verify FLINK-34219 Introduce a new join operator to support minibatch

2024-02-03 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-34349:


 Summary: Release Testing: Verify FLINK-34219 Introduce a new join 
operator to support minibatch
 Key: FLINK-34349
 URL: https://issues.apache.org/jira/browse/FLINK-34349
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Affects Versions: 1.19.0
Reporter: Shuai Xu
Assignee: Shuai Xu
 Fix For: 1.19.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34462) Session window with negative parameter throws unclear exception

2024-02-19 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-34462:


 Summary: Session window with negative parameter throws unclear 
exception
 Key: FLINK-34462
 URL: https://issues.apache.org/jira/browse/FLINK-34462
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.19.0
Reporter: Shuai Xu


Set invalid parameter in session window get unclear error.
{code:java}
// add test in WindowAggregateITCase
def testEventTimeSessionWindowWithInvalidName(): Unit = {
  val sql =
"""
  |SELECT
  |  window_start,
  |  window_end,
  |  COUNT(*),
  |  SUM(`bigdec`),
  |  MAX(`double`),
  |  MIN(`float`),
  |  COUNT(DISTINCT `string`),
  |  concat_distinct_agg(`string`)
  |FROM TABLE(
  |   SESSION(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '-5' SECOND))
  |GROUP BY window_start, window_end
""".stripMargin

  val sink = new TestingAppendSink
  tEnv.sqlQuery(sql).toDataStream.addSink(sink)
  env.execute()
} 

{code}
{code:java}
java.lang.AssertionError: Sql optimization: Assertion error: null at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:79)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
 at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) 
at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
 at scala.collection.Iterator.foreach(Iterator.scala:937) at 
scala.collection.Iterator.foreach$(Iterator.scala:937) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at 
scala.collection.IterableLike.foreach(IterableLike.scala:70) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:69) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at 
scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
 at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:320)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:178)
 at 
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
 at 
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
 at 
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:151)
 at 
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:128)
 at 
org.apache.flink.table.api.bridge.scala.TableConversions.toDataStream(TableConversions.scala:60)
 at 
org.apache.flink.table.planner.runtime.stream.sql.WindowAggregateITCase.testEventTimeSessionWindowWithInvalidName(WindowAggregateITCase.scala:1239)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at 
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at 
java.util.Iterator.forEachRemaining(Iterator.java:116) at 
scala.collection.convert.Wrappers$IteratorWrapper.forEachRemaining(Wrappers.scala:26)
 at 
java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
 at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) 
at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
 at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at 
ja

[jira] [Created] (FLINK-36654) Decimal divide Integer reports Null pointer exception

2024-11-04 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-36654:


 Summary: Decimal divide Integer reports Null pointer exception
 Key: FLINK-36654
 URL: https://issues.apache.org/jira/browse/FLINK-36654
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 2.0-preview
Reporter: Shuai Xu


paste the test code in SqlExpressionTest.

 
{code:java}
// code placeholder
// Decimal(2,1) / Integer => Decimal(13,12)
testSqlApi("1.0/400", "0.0025")

// Decimal(2,1) / BigInteger => Decimal(22,21)
testSqlApi("1.0/1000", "0.00100")

// Decimal(2,1) / TinyInt => Decimal(7,6)
testSqlApi("1.0/cast(100 as TINYINT)", "0.01")

// Decimal(2,1) / SmallInt => Decimal(8,7)
testSqlApi("1.0/cast(1 as SMALLINT)", "0.0001000") {code}
and get exception 

 

 
{code:java}
org.apache.flink.table.planner.expressions.SqlExpressionTest,testDivide
java.lang.AssertionError: Error when executing the expression. Expression code:
            // Using option 'table.exec.legacy-cast-behaviour':'false'      // 
Timezone: org.apache.flink.table.api.TableConfig@61150d94                public 
class TestFunction$20          extends 
org.apache.flink.api.common.functions.RichMapFunction {
                org.apache.flink.table.data.DecimalData decimal$0 = 
org.apache.flink.table.data.DecimalDataUtils.castFrom(        "1.0",        2,  
      1);                        org.apache.flink.table.data.DecimalData 
decimal$5 = org.apache.flink.table.data.DecimalDataUtils.castFrom(        
"1.0",        2,        1);                        
org.apache.flink.table.data.DecimalData decimal$10 = 
org.apache.flink.table.data.DecimalDataUtils.castFrom(        "1.0",        2,  
      1);                        org.apache.flink.table.data.DecimalData 
decimal$15 = org.apache.flink.table.data.DecimalDataUtils.castFrom(        
"1.0",        2,        1);                
org.apache.flink.table.data.binary.BinaryRowData out = new 
org.apache.flink.table.data.binary.BinaryRowData(4);        
org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new 
org.apache.flink.table.data.writer.BinaryRowWriter(out);
        public TestFunction$20(Object[] references) throws Exception {          
        }
        
        @Override        public void 
open(org.apache.flink.api.common.functions.OpenContext openContext) throws 
Exception {                  }
        @Override        public Object map(Object _in1) throws Exception {      
    org.apache.flink.table.data.RowData in1 = 
(org.apache.flink.table.data.RowData) _in1;                    boolean 
isNull$1;          org.apache.flink.table.data.DecimalData result$2;          
boolean isNull$3;          org.apache.flink.table.data.binary.BinaryStringData 
result$4;          boolean isNull$6;          
org.apache.flink.table.data.DecimalData result$7;          boolean isNull$8;    
      org.apache.flink.table.data.binary.BinaryStringData result$9;          
boolean isNull$11;          org.apache.flink.table.data.DecimalData result$12;  
        boolean isNull$13;          
org.apache.flink.table.data.binary.BinaryStringData result$14;          boolean 
isNull$16;          org.apache.flink.table.data.DecimalData result$17;          
boolean isNull$18;          org.apache.flink.table.data.binary.BinaryStringData 
result$19;                                                  outWriter.reset();  
                                                isNull$1 = false || false;      
    result$2 = null;          if (!isNull$1) {                                
result$2 = 
org.apache.flink.table.data.DecimalDataUtils.divide(((org.apache.flink.table.data.DecimalData)
 decimal$0), 
org.apache.flink.table.data.DecimalDataUtils.castFrom(((long)(((int) 400))), 
13, 12), 13, 12);                      isNull$1 = (result$2 == null);          
}                     // --- Cast section generated by 
org.apache.flink.table.planner.functions.casting.CharVarCharTrimPadCastRule     
      isNull$3 = isNull$1;          if (!isNull$3) {          result$4 = 
org.apache.flink.table.data.binary.BinaryStringData.fromString("" + result$2);  
        isNull$3 = result$4 == null;          } else {          result$4 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;          }      
               // --- End cast section                                   if 
(isNull$3) {            outWriter.setNullAt(0);          } else {            
outWriter.writeString(0, result$4);          }                                  
                                     isNull$6 = false || false;          
result$7 = null;          if (!isNull$6) {                                
result$7 = 
org.apache.flink.table.data.DecimalDataUtils.divide

[jira] [Created] (FLINK-36962) push down non-deterministic filter after stream join to source by mistake

2024-12-25 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-36962:


 Summary: push down non-deterministic filter after stream join to 
source by mistake
 Key: FLINK-36962
 URL: https://issues.apache.org/jira/browse/FLINK-36962
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 2.0-preview
Reporter: Shuai Xu


Non-deterministic filter after stream join is push down to source by mistake.

Modify the 
org.apache.flink.table.planner.plan.stream.sql.CalcTest with following snippet 
of code.
 
{code:java}
@BeforeEach
def setup(): Unit = {
  util.addTableSource[(Long, Int, String)]("MyTable", 'a, 'b, 'c)
  util.addTableSource[(Long, Int, String)]("SourceTable", 'a, 'b, 'c)
  util.addTemporarySystemFunction("random_udf", new NonDeterministicUdf)
}

@Test
def testCalcWithNonDeterministicFilterAfterJoin(): Unit = {
  val sqlQuery =
"SELECT a FROM (SELECT t1.a, t1.c as t1c, t2.c as t2c FROM MyTable t1 join 
SourceTable t2 on t1.b = t2.b) t " +
  "WHERE TO_TIMESTAMP(t.t1c, '-MM-dd HH:mm:ss') > TIMESTAMPADD(HOUR, 
-2, NOW()) and t.t2c > '2022-01-01 00:00:00'"
  util.verifyRelPlan(sqlQuery)
}
{code}
we expected the plan with 
{code:java}
Calc(select=[a], where=[>(TO_TIMESTAMP(c, '-MM-dd HH:mm:ss'), +(NOW(), 
-720:INTERVAL HOUR))])+- Join(joinType=[InnerJoin], where=[=(b, b0)], 
select=[a, b, c, b0], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])   :- Exchange(distribution=[hash[b]])   :  +- 
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])   +- 
Exchange(distribution=[hash[b]])  +- Calc(select=[b], where=[>(c, 
'2022-01-01 00:00:00')]) +- 
LegacyTableSourceScan(table=[[default_catalog, default_database, SourceTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c]){code}
but the plan is
{code:java}
Calc(select=[a])
+- Join(joinType=[InnerJoin], where=[=(b, b0)], select=[a, b, b0], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
   :- Exchange(distribution=[hash[b]])
   :  +- Calc(select=[a, b], where=[>(TO_TIMESTAMP(c, '-MM-dd HH:mm:ss'), 
+(NOW(), -720:INTERVAL HOUR))])
   : +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
   +- Exchange(distribution=[hash[b]])
  +- Calc(select=[b], where=[>(c, '2022-01-01 00:00:00')])
 +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
SourceTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-37640) Type timestamp_ltz is converted timestamp wrongly when scanreuse

2025-04-09 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-37640:


 Summary: Type timestamp_ltz is converted timestamp wrongly when 
scanreuse
 Key: FLINK-37640
 URL: https://issues.apache.org/jira/browse/FLINK-37640
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 2.0.0
Reporter: Shuai Xu


When watermark defined on type of timestamp_ltz in the source ddl. During 
planner optimization, the type of the field is converted to timestamp wronly 
after scanreuse.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-37875) Introduce the option and the support of planner

2025-05-30 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-37875:


 Summary: Introduce the option and the support of planner
 Key: FLINK-37875
 URL: https://issues.apache.org/jira/browse/FLINK-37875
 Project: Flink
  Issue Type: Sub-task
Reporter: Shuai Xu


This includes: 
1. options
2. planner changes



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-37877) Add the key ordered operator in table runtime

2025-05-30 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-37877:


 Summary: Add the key ordered operator in table runtime
 Key: FLINK-37877
 URL: https://issues.apache.org/jira/browse/FLINK-37877
 Project: Flink
  Issue Type: Sub-task
Reporter: Shuai Xu


This aims to complete the feature.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-37874) FLIP-519: Introduce async lookup key ordered mode

2025-05-30 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-37874:


 Summary: FLIP-519: Introduce async lookup key ordered mode
 Key: FLINK-37874
 URL: https://issues.apache.org/jira/browse/FLINK-37874
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API, Table SQL / Planner
Affects Versions: 2.1.0
Reporter: Shuai Xu


The motivation of this FLIP aims to introduce Async Key Ordered Lookup Join to 
Flink, addressing the need to process records in key-specific order (e.g., 
maintaining update sequence for upsertKey) while enabling concurrent processing 
of different keys to boost throughput.

https://cwiki.apache.org/confluence/display/FLINK/FLIP-519%3A++Introduce+async+lookup+key+ordered+mode



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-37876) Introduce key ordered component in runtime

2025-05-30 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-37876:


 Summary: Introduce key ordered component in runtime
 Key: FLINK-37876
 URL: https://issues.apache.org/jira/browse/FLINK-37876
 Project: Flink
  Issue Type: Sub-task
Reporter: Shuai Xu


This mainly introduce framework async execution controller in table runtime



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-37921) Refactor current implementation with implementation in FLIP-425

2025-06-09 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-37921:


 Summary: Refactor current implementation with implementation in 
FLIP-425
 Key: FLINK-37921
 URL: https://issues.apache.org/jira/browse/FLINK-37921
 Project: Flink
  Issue Type: Sub-task
Reporter: Shuai Xu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-37947) Add sql hint to enable this optimization

2025-06-12 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-37947:


 Summary: Add sql hint to enable this optimization
 Key: FLINK-37947
 URL: https://issues.apache.org/jira/browse/FLINK-37947
 Project: Flink
  Issue Type: Sub-task
Reporter: Shuai Xu


Currently, only options in conf enable this optimization. For users, sql hint 
could be an easier way to configure.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-38032) Release Testing: Verify FLIP-519: Introduce async lookup key ordered mode

2025-06-30 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-38032:


 Summary: Release Testing: Verify FLIP-519: Introduce async lookup 
key ordered mode
 Key: FLINK-38032
 URL: https://issues.apache.org/jira/browse/FLINK-38032
 Project: Flink
  Issue Type: Sub-task
Reporter: Shuai Xu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)