Re: Re: [DISCUSS] FLIP-213: TaskManager's Flame Graphs

2022-11-05 Thread Yun Tang
Hi Jacky,

Apart from the continuous discussions, I think you can share current progress 
or even make it public as one of the flink-packages [1]


[1] https://flink-packages.org/


Best
Yun Tang

From: Jacky Lau <281293...@qq.com.INVALID>
Sent: Friday, February 11, 2022 10:11
To: dev@flink.apache.org 
Subject: RE: Re: [DISCUSS] FLIP-213: TaskManager's Flame Graphs

Our flink application is on k8s.Yes, user can use the async-profiler directly, 
but it is not convenient for user, who should download the jars and need to 
know how to use it. And some users don’t know the tool.if we integrate it, user 
will benefit a lot.

On 2022/01/26 18:56:17 David Morávek wrote:
> I'd second to Alex's concerns. Is there a reason why you can't use the
> async-profiler directly? In what kind of environment are your Flink
> clusters running (YARN / k8s / ...)?
>
> Best,
> D.
>
> On Wed, Jan 26, 2022 at 4:32 PM Alexander Fedulov 
> wrote:
>
>> Hi Jacky,
>>
>> Could you please clarify what kind of *problems* you experience with the
>> large parallelism? You referred to D3, is it something related to rendering
>> on the browser side or is it about the samples collection process? Were you
>> able to identify the bottleneck?
>>
>> Fundamentally I have some concerns regarding the proposed approach:
>> 1. Calling shell scripts triggered via the web UI is a security concern and
>> it needs to be evaluated carefully if it could introduce any unexpected
>> attack vectors (depending on the implementation, passed parameters etc.)
>> 2. My understanding is that the async-profiler implementation is
>> system-dependent. How do you propose to handle multiple architectures?
>> Would you like to ship each available implementation within Flink? [1]
>> 3. Do you plan to make use of full async-profiler features including native
>> calls sampling with perf_events? If so, the issue I see is that some
>> environments restrict ptrace calls by default [2]
>>
>> [1] https://github.com/jvm-profiling-tools/async-profiler#download
>> [2]
>>
>> https://kubernetes.io/docs/concepts/policy/pod-security-policy/#host-namespaces
>>
>>
>> Best,
>> Alexander Fedulov
>>
>> On Wed, Jan 26, 2022 at 1:59 PM 李森  wrote:
>>
>>> This is an expected feature, as we also experienced browser crashes on
>>> existing operator-level flame graphs
>>>
>>> Best,
>>> Echo Lee
>>>
 在 2022年1月24日,下午6:16,David Morávek  写道:

 Hi Jacky,

 The link seems to be broken, here is the correct one [1].

 [1]

>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-213%3A+TaskManager%27s+Flame+Graphs

 Best,
 D.

> On Mon, Jan 24, 2022 at 9:48 AM Jacky Lau <28...@qq.com.invalid>
>>> wrote:
>
> Hi All,
>     I would like to start the discussion on FLIP-213 <
>
>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-213%3A+TaskManager%27s+Flame+Graphs>
> ;
>  which aims to provide taskmanager level(process level) flame
>> graph
> by async profiler, which is most popular tool in java performance. and
>>> the
> arthas and intellij both use it. 
> And we support it in our ant group company.
>    And Flink supports FLIP-165: Operator's Flame Graphs
> now. and it draw flame graph by the front-end
> libraries d3-flame-graph, which has some problem in  jobs
> of large of parallelism.
>    Please be aware that the FLIP wiki area is not fully done
> since i don't konw whether it will accept by
>> flink community. 
>    Feel free to add your thoughts to make this feature
>>> better! i
> am looking forward  to all your response. Thanks too much!
>
>
>
>
> Best Jacky Lau
>>>
>>


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-05 Thread Niels Basjes
I'm really looking forward to seeing this in action.

Niels

On Fri, 4 Nov 2022, 19:37 Maximilian Michels,  wrote:

> Hi,
>
> I would like to kick off the discussion on implementing autoscaling for
> Flink as part of the Flink Kubernetes operator. I've outlined an approach
> here which I find promising:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling
>
> I've been discussing this approach with some of the operator contributors:
> Gyula, Marton, Matyas, and Thomas (all in CC). We started prototyping an
> implementation based on the current FLIP design. If that goes well, we
> would like to contribute this to Flink based on the results of the
> discussion here.
>
> I'm curious to hear your thoughts.
>
> -Max
>


[jira] [Created] (FLINK-29900) Implement Table API for DynamoDB Sink

2022-11-05 Thread Hong Liang Teoh (Jira)
Hong Liang Teoh created FLINK-29900:
---

 Summary: Implement Table API for DynamoDB Sink
 Key: FLINK-29900
 URL: https://issues.apache.org/jira/browse/FLINK-29900
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / DynamoDB
Reporter: Hong Liang Teoh


Implement table API support for DynamoDB sink



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29901) Support “NOT IN” built-in function in Table API

2022-11-05 Thread zhangjingcun (Jira)
zhangjingcun created FLINK-29901:


 Summary: Support “NOT IN” built-in function in Table API
 Key: FLINK-29901
 URL: https://issues.apache.org/jira/browse/FLINK-29901
 Project: Flink
  Issue Type: New Feature
  Components: API / Python, Table SQL / API
Affects Versions: 1.16.0
Reporter: zhangjingcun






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29902) Support “value1.notIn(TABLE)” built-in function in Table API

2022-11-05 Thread zhangjingcun (Jira)
zhangjingcun created FLINK-29902:


 Summary: Support “value1.notIn(TABLE)” built-in function in Table 
API
 Key: FLINK-29902
 URL: https://issues.apache.org/jira/browse/FLINK-29902
 Project: Flink
  Issue Type: New Feature
  Components: API / Python, Table SQL / API
Reporter: zhangjingcun






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29903) Support “string1.notSimilar(string2)” built-in function in Table API

2022-11-05 Thread zhangjingcun (Jira)
zhangjingcun created FLINK-29903:


 Summary: Support “string1.notSimilar(string2)” built-in function 
in Table API
 Key: FLINK-29903
 URL: https://issues.apache.org/jira/browse/FLINK-29903
 Project: Flink
  Issue Type: New Feature
  Components: API / Python, Table SQL / API
Affects Versions: 1.16.0
Reporter: zhangjingcun






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29904) Support “string1.notLike(string2)” built-in function in Table API

2022-11-05 Thread zhangjingcun (Jira)
zhangjingcun created FLINK-29904:


 Summary: Support “string1.notLike(string2)” built-in function in 
Table API
 Key: FLINK-29904
 URL: https://issues.apache.org/jira/browse/FLINK-29904
 Project: Flink
  Issue Type: New Feature
  Components: API / Python, Table SQL / API
Affects Versions: 1.16.0
Reporter: zhangjingcun






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-05 Thread Márton Balassi
Thanks for preparing the FLIP and kicking off the discussion, Max. Looking
forward to this. :-)

On Sat, Nov 5, 2022 at 9:27 AM Niels Basjes  wrote:

> I'm really looking forward to seeing this in action.
>
> Niels
>
> On Fri, 4 Nov 2022, 19:37 Maximilian Michels,  wrote:
>
>> Hi,
>>
>> I would like to kick off the discussion on implementing autoscaling for
>> Flink as part of the Flink Kubernetes operator. I've outlined an approach
>> here which I find promising:
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling
>>
>> I've been discussing this approach with some of the operator contributors:
>> Gyula, Marton, Matyas, and Thomas (all in CC). We started prototyping an
>> implementation based on the current FLIP design. If that goes well, we
>> would like to contribute this to Flink based on the results of the
>> discussion here.
>>
>> I'm curious to hear your thoughts.
>>
>> -Max
>>
>


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-05 Thread Biao Geng
Hi Max,

Thanks a lot for the FLIP. It is an extremely attractive feature!

Just some follow up questions/thoughts after reading the FLIP:
In the doc, the discussion of  the strategy of “scaling out” is thorough and 
convincing to me but it seems that “scaling down” is less discussed. I have 2 
cents for this aspect:

  1.  For source parallelisms, if the user configure a much larger value than 
normal, there should be very little pending records though it is possible to 
get optimized. But IIUC, in current algorithm, we will not take actions for 
this case as the backlog growth rate is almost zero. Is the understanding right?
  2.  Compared with “scaling out”, “scaling in” is usually more dangerous as it 
is more likely to lead to negative influence to the downstream jobs. The 
min/max load bounds should be useful. I am wondering if it is possible to have 
different strategy for “scaling in” to make it more conservative. Or more 
eagerly, allow custom autoscaling strategy(e.g. time-based strategy).

Another side thought is that to recover a job from checkpoint/savepoint, the 
new parallelism cannot be larger than max parallelism defined in the 
checkpoint(see 
this).
 Not sure if this limit should be mentioned in the FLIP.

Again, thanks for the great work and looking forward to using flink k8s 
operator with it!

Best,
Biao Geng

From: Maximilian Michels 
Date: Saturday, November 5, 2022 at 2:37 AM
To: dev 
Cc: Gyula Fóra , Thomas Weise , Marton 
Balassi , Őrhidi Mátyás 
Subject: [DISCUSS] FLIP-271: Autoscaling
Hi,

I would like to kick off the discussion on implementing autoscaling for
Flink as part of the Flink Kubernetes operator. I've outlined an approach
here which I find promising:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling

I've been discussing this approach with some of the operator contributors:
Gyula, Marton, Matyas, and Thomas (all in CC). We started prototyping an
implementation based on the current FLIP design. If that goes well, we
would like to contribute this to Flink based on the results of the
discussion here.

I'm curious to hear your thoughts.

-Max


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-05 Thread Gyula Fóra
Hey!

Thanks for the input!

The algorithm does not really differentiate between scaling up or down as
it’s concerned about finding the right parallelism to match the target
processing rate with just enough spare capacity.

Let me try to address your specific points:

1. The backlog growth rate only matters for computing the target processing
rate for the sources. If the parallelism is high enough and there is no
back pressure it will be close to 0 so the target rate is the source read
rate. This is as intended. If we see that the sources are not busy and they
can read more than enough the algorithm would scale them down.

2. You are right , it’s dangerous to scale in too much, so we already
thought about limiting the scale down amount per scaling step/time window
to give more safety. But we can definitely think about different strategies
in the future!

The observation regarding max parallelism is very important and we should
always take that into consideration.

Cheers
Gyula

On Sat, 5 Nov 2022 at 11:46, Biao Geng  wrote:

> Hi Max,
>
> Thanks a lot for the FLIP. It is an extremely attractive feature!
>
> Just some follow up questions/thoughts after reading the FLIP:
> In the doc, the discussion of  the strategy of “scaling out” is thorough
> and convincing to me but it seems that “scaling down” is less discussed. I
> have 2 cents for this aspect:
>
>   1.  For source parallelisms, if the user configure a much larger value
> than normal, there should be very little pending records though it is
> possible to get optimized. But IIUC, in current algorithm, we will not take
> actions for this case as the backlog growth rate is almost zero. Is the
> understanding right?
>   2.  Compared with “scaling out”, “scaling in” is usually more dangerous
> as it is more likely to lead to negative influence to the downstream jobs.
> The min/max load bounds should be useful. I am wondering if it is possible
> to have different strategy for “scaling in” to make it more conservative.
> Or more eagerly, allow custom autoscaling strategy(e.g. time-based
> strategy).
>
> Another side thought is that to recover a job from checkpoint/savepoint,
> the new parallelism cannot be larger than max parallelism defined in the
> checkpoint(see this<
> https://github.com/apache/flink/blob/17a782c202c93343b8884cb52f4562f9c4ba593f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L128>).
> Not sure if this limit should be mentioned in the FLIP.
>
> Again, thanks for the great work and looking forward to using flink k8s
> operator with it!
>
> Best,
> Biao Geng
>
> From: Maximilian Michels 
> Date: Saturday, November 5, 2022 at 2:37 AM
> To: dev 
> Cc: Gyula Fóra , Thomas Weise ,
> Marton Balassi , Őrhidi Mátyás <
> matyas.orh...@gmail.com>
> Subject: [DISCUSS] FLIP-271: Autoscaling
> Hi,
>
> I would like to kick off the discussion on implementing autoscaling for
> Flink as part of the Flink Kubernetes operator. I've outlined an approach
> here which I find promising:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling
>
> I've been discussing this approach with some of the operator contributors:
> Gyula, Marton, Matyas, and Thomas (all in CC). We started prototyping an
> implementation based on the current FLIP design. If that goes well, we
> would like to contribute this to Flink based on the results of the
> discussion here.
>
> I'm curious to hear your thoughts.
>
> -Max
>


Re: [jira] [Created] (FLINK-28372) Investigate Akka Artery

2022-11-05 Thread Martijn Visser
Hi Razin,

https://issues.apache.org/jira/browse/FLINK-29281 is the ticket to keep an
eye on regarding this topic.

Best regards,

Martijn

On Fri, Nov 4, 2022 at 11:11 PM Razin Bouzar 
wrote:

> Hi all,
>
> Has there been any further work or discussions around this issue? The Netty
> security vulnerabilities in 3.10.6 are of some concern.
>
> On Mon, Jul 4, 2022 at 3:45 AM Chesnay Schepler (Jira) 
> wrote:
>
> > Chesnay Schepler created FLINK-28372:
> > 
> >
> >  Summary: Investigate Akka Artery
> >  Key: FLINK-28372
> >  URL: https://issues.apache.org/jira/browse/FLINK-28372
> >  Project: Flink
> >   Issue Type: Technical Debt
> >   Components: Runtime / RPC
> > Reporter: Chesnay Schepler
> >
> >
> > Our current Akka setup uses the deprecated netty-based stack. We need to
> > eventually migrate to Akka Artery.
> >
> >
> >
> > --
> > This message was sent by Atlassian Jira
> > (v8.20.10#820010)
> >
>
>
> --
> RAZIN BOUZAR
> Senior Engineer - Monitoring Cloud | Salesforce
> Mobile: 317-502-8995
>
> 
>


Re: [flink] branch master updated: [hotfix][docs] Set proper watermark and description for event-time temporal table join example

2022-11-05 Thread Michael Stollery
unsubscribe

Best Regards,

Michael Stollery


On Sat, Nov 5, 2022 at 6:17 AM  wrote:

> This is an automated email from the ASF dual-hosted git repository.
>
> leonard pushed a commit to branch master
> in repository https://gitbox.apache.org/repos/asf/flink.git
>
>
> The following commit(s) were added to refs/heads/master by this push:
>  new 6a76233bef3 [hotfix][docs] Set proper watermark and description
> for event-time temporal table join example
> 6a76233bef3 is described below
>
> commit 6a76233bef3ab2b11952a96013604d8562496353
> Author: Jing Ge 
> AuthorDate: Sat Nov 5 14:16:55 2022 +0100
>
> [hotfix][docs] Set proper watermark and description for event-time
> temporal table join example
> ---
>  docs/content.zh/docs/dev/table/sql/queries/joins.md | 8 +---
>  docs/content/docs/dev/table/sql/queries/joins.md| 8 +---
>  2 files changed, 10 insertions(+), 6 deletions(-)
>
> diff --git a/docs/content.zh/docs/dev/table/sql/queries/joins.md
> b/docs/content.zh/docs/dev/table/sql/queries/joins.md
> index 0a5fd96f71c..2ba849153a4 100644
> --- a/docs/content.zh/docs/dev/table/sql/queries/joins.md
> +++ b/docs/content.zh/docs/dev/table/sql/queries/joins.md
> @@ -143,7 +143,7 @@ CREATE TABLE orders (
>  price   DECIMAL(32,2),
>  currencySTRING,
>  order_time  TIMESTAMP(3),
> -WATERMARK FOR order_time AS order_time
> +WATERMARK FOR order_time AS order_time - INTERVAL '15' SECOND
>  ) WITH (/* ... */);
>
>  -- Define a versioned table of currency rates.
> @@ -154,7 +154,7 @@ CREATE TABLE currency_rates (
>  currency STRING,
>  conversion_rate DECIMAL(32, 2),
>  update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp`
> VIRTUAL,
> -WATERMARK FOR update_time AS update_time,
> +WATERMARK FOR update_time AS update_time - INTERVAL '15' SECOND,
>  PRIMARY KEY(currency) NOT ENFORCED
>  ) WITH (
>  'connector' = 'kafka',
> @@ -179,7 +179,9 @@ o_002 12.51  EUR   1.10 12:06:00
>
>  ```
>
> -**Note:** The event-time temporal join is triggered by a watermark from
> the left and right sides; please ensure both sides of the join have set
> watermark correctly.
> +**Note:** The event-time temporal join is triggered by a watermark from
> the left and right sides.
> +The `INTERVAL` time subtraction is used to wait for late events in order
> to make sure the join will meet the expectation.
> +Please ensure both sides of the join have set watermark correctly.
>
>  **Note:** The event-time temporal join requires the primary key contained
> in the equivalence condition of the temporal join condition, e.g., The
> primary key `currency_rates.currency` of table `currency_rates` to be
> constrained in the condition `orders.currency = currency_rates.currency`.
>
> diff --git a/docs/content/docs/dev/table/sql/queries/joins.md
> b/docs/content/docs/dev/table/sql/queries/joins.md
> index 4674ffeeb84..97f672ab8ac 100644
> --- a/docs/content/docs/dev/table/sql/queries/joins.md
> +++ b/docs/content/docs/dev/table/sql/queries/joins.md
> @@ -143,7 +143,7 @@ CREATE TABLE orders (
>  price   DECIMAL(32,2),
>  currencySTRING,
>  order_time  TIMESTAMP(3),
> -WATERMARK FOR order_time AS order_time
> +WATERMARK FOR order_time AS order_time - INTERVAL '15' SECOND
>  ) WITH (/* ... */);
>
>  -- Define a versioned table of currency rates.
> @@ -154,7 +154,7 @@ CREATE TABLE currency_rates (
>  currency STRING,
>  conversion_rate DECIMAL(32, 2),
>  update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp`
> VIRTUAL,
> -WATERMARK FOR update_time AS update_time,
> +WATERMARK FOR update_time AS update_time - INTERVAL '15' SECOND,
>  PRIMARY KEY(currency) NOT ENFORCED
>  ) WITH (
> 'connector' = 'kafka',
> @@ -179,7 +179,9 @@ o_002 12.51  EUR   1.10 12:06:00
>
>  ```
>
> -**Note:** The event-time temporal join is triggered by a watermark from
> the left and right sides; please ensure both sides of the join have set
> watermark correctly.
> +**Note:** The event-time temporal join is triggered by a watermark from
> the left and right sides.
> +The `INTERVAL` time subtraction is used to wait for late events in order
> to make sure the join will meet the expectation.
> +Please ensure both sides of the join have set watermark correctly.
>
>  **Note:** The event-time temporal join requires the primary key contained
> in the equivalence condition of the temporal join condition, e.g., The
> primary key `currency_rates.currency` of table `currency_rates` to be
> constrained in the condition `orders.currency = currency_rates.currency`.
>
>
>


Re: [flink] branch master updated: [hotfix][docs] Set proper watermark and description for event-time temporal table join example

2022-11-05 Thread Leonard Xu
Hi, Michael

Please send email to commits-unsubscr...@flink.apache.org if you want to 
unsubscribe the mail from comm...@flink.apache.org, you can refer[1] for more 
details.

Best,
Leonard
[1]https://flink.apache.org/community.html#mailing-lists


> 2022年11月5日 下午10:04,Michael Stollery  写道:
> 
> unsubscribe
> 
> Best Regards,
> 
> Michael Stollery
> 
> 
> On Sat, Nov 5, 2022 at 6:17 AM  wrote:
> 
>> This is an automated email from the ASF dual-hosted git repository.
>> 
>> leonard pushed a commit to branch master
>> in repository https://gitbox.apache.org/repos/asf/flink.git
>> 
>> 
>> The following commit(s) were added to refs/heads/master by this push:
>> new 6a76233bef3 [hotfix][docs] Set proper watermark and description
>> for event-time temporal table join example
>> 6a76233bef3 is described below
>> 
>> commit 6a76233bef3ab2b11952a96013604d8562496353
>> Author: Jing Ge 
>> AuthorDate: Sat Nov 5 14:16:55 2022 +0100
>> 
>>[hotfix][docs] Set proper watermark and description for event-time
>> temporal table join example
>> ---
>> docs/content.zh/docs/dev/table/sql/queries/joins.md | 8 +---
>> docs/content/docs/dev/table/sql/queries/joins.md| 8 +---
>> 2 files changed, 10 insertions(+), 6 deletions(-)
>> 
>> diff --git a/docs/content.zh/docs/dev/table/sql/queries/joins.md
>> b/docs/content.zh/docs/dev/table/sql/queries/joins.md
>> index 0a5fd96f71c..2ba849153a4 100644
>> --- a/docs/content.zh/docs/dev/table/sql/queries/joins.md
>> +++ b/docs/content.zh/docs/dev/table/sql/queries/joins.md
>> @@ -143,7 +143,7 @@ CREATE TABLE orders (
>> price   DECIMAL(32,2),
>> currencySTRING,
>> order_time  TIMESTAMP(3),
>> -WATERMARK FOR order_time AS order_time
>> +WATERMARK FOR order_time AS order_time - INTERVAL '15' SECOND
>> ) WITH (/* ... */);
>> 
>> -- Define a versioned table of currency rates.
>> @@ -154,7 +154,7 @@ CREATE TABLE currency_rates (
>> currency STRING,
>> conversion_rate DECIMAL(32, 2),
>> update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp`
>> VIRTUAL,
>> -WATERMARK FOR update_time AS update_time,
>> +WATERMARK FOR update_time AS update_time - INTERVAL '15' SECOND,
>> PRIMARY KEY(currency) NOT ENFORCED
>> ) WITH (
>> 'connector' = 'kafka',
>> @@ -179,7 +179,9 @@ o_002 12.51  EUR   1.10 12:06:00
>> 
>> ```
>> 
>> -**Note:** The event-time temporal join is triggered by a watermark from
>> the left and right sides; please ensure both sides of the join have set
>> watermark correctly.
>> +**Note:** The event-time temporal join is triggered by a watermark from
>> the left and right sides.
>> +The `INTERVAL` time subtraction is used to wait for late events in order
>> to make sure the join will meet the expectation.
>> +Please ensure both sides of the join have set watermark correctly.
>> 
>> **Note:** The event-time temporal join requires the primary key contained
>> in the equivalence condition of the temporal join condition, e.g., The
>> primary key `currency_rates.currency` of table `currency_rates` to be
>> constrained in the condition `orders.currency = currency_rates.currency`.
>> 
>> diff --git a/docs/content/docs/dev/table/sql/queries/joins.md
>> b/docs/content/docs/dev/table/sql/queries/joins.md
>> index 4674ffeeb84..97f672ab8ac 100644
>> --- a/docs/content/docs/dev/table/sql/queries/joins.md
>> +++ b/docs/content/docs/dev/table/sql/queries/joins.md
>> @@ -143,7 +143,7 @@ CREATE TABLE orders (
>> price   DECIMAL(32,2),
>> currencySTRING,
>> order_time  TIMESTAMP(3),
>> -WATERMARK FOR order_time AS order_time
>> +WATERMARK FOR order_time AS order_time - INTERVAL '15' SECOND
>> ) WITH (/* ... */);
>> 
>> -- Define a versioned table of currency rates.
>> @@ -154,7 +154,7 @@ CREATE TABLE currency_rates (
>> currency STRING,
>> conversion_rate DECIMAL(32, 2),
>> update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp`
>> VIRTUAL,
>> -WATERMARK FOR update_time AS update_time,
>> +WATERMARK FOR update_time AS update_time - INTERVAL '15' SECOND,
>> PRIMARY KEY(currency) NOT ENFORCED
>> ) WITH (
>>'connector' = 'kafka',
>> @@ -179,7 +179,9 @@ o_002 12.51  EUR   1.10 12:06:00
>> 
>> ```
>> 
>> -**Note:** The event-time temporal join is triggered by a watermark from
>> the left and right sides; please ensure both sides of the join have set
>> watermark correctly.
>> +**Note:** The event-time temporal join is triggered by a watermark from
>> the left and right sides.
>> +The `INTERVAL` time subtraction is used to wait for late events in order
>> to make sure the join will meet the expectation.
>> +Please ensure both sides of the join have set watermark correctly.
>> 
>> **Note:** The event-time temporal join requires the primary key contained
>> in the equivalence condition of the temporal join condition, e.g., The
>> primary key `currency_rates.currency` of table `currency_rates` to be
>> constrained in the condition

[jira] [Created] (FLINK-29905) Migrate flink-connector-dynamodb-parent to flink-connector-aws-parent

2022-11-05 Thread Danny Cranmer (Jira)
Danny Cranmer created FLINK-29905:
-

 Summary: Migrate flink-connector-dynamodb-parent to 
flink-connector-aws-parent
 Key: FLINK-29905
 URL: https://issues.apache.org/jira/browse/FLINK-29905
 Project: Flink
  Issue Type: Sub-task
Reporter: Danny Cranmer


Update {{flink-connector-dynamodb-parent}} pom to be more general for the AWS 
connector repo



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-05 Thread JunRui Lee
Hi Max,

Thanks for writing this FLIP and initiating the discussion.

I just have a small question after reading the FLIP:

In the document, I didn't find the definition of when to trigger
autoScaling after some jobVertex reach the threshold. If I missed is,
please let me know.
IIUC, the proper triggering rules are necessary to avoid unnecessary
autoscaling caused by temporary large changes in data,
and in this case, it will lead to at least two meaningless resubmissions of
jobs, which will negatively affect users.

Thanks,
JunRui Lee

Gyula Fóra  于2022年11月5日周六 20:38写道:

> Hey!
>
> Thanks for the input!
>
> The algorithm does not really differentiate between scaling up or down as
> it’s concerned about finding the right parallelism to match the target
> processing rate with just enough spare capacity.
>
> Let me try to address your specific points:
>
> 1. The backlog growth rate only matters for computing the target processing
> rate for the sources. If the parallelism is high enough and there is no
> back pressure it will be close to 0 so the target rate is the source read
> rate. This is as intended. If we see that the sources are not busy and they
> can read more than enough the algorithm would scale them down.
>
> 2. You are right , it’s dangerous to scale in too much, so we already
> thought about limiting the scale down amount per scaling step/time window
> to give more safety. But we can definitely think about different strategies
> in the future!
>
> The observation regarding max parallelism is very important and we should
> always take that into consideration.
>
> Cheers
> Gyula
>
> On Sat, 5 Nov 2022 at 11:46, Biao Geng  wrote:
>
> > Hi Max,
> >
> > Thanks a lot for the FLIP. It is an extremely attractive feature!
> >
> > Just some follow up questions/thoughts after reading the FLIP:
> > In the doc, the discussion of  the strategy of “scaling out” is thorough
> > and convincing to me but it seems that “scaling down” is less discussed.
> I
> > have 2 cents for this aspect:
> >
> >   1.  For source parallelisms, if the user configure a much larger value
> > than normal, there should be very little pending records though it is
> > possible to get optimized. But IIUC, in current algorithm, we will not
> take
> > actions for this case as the backlog growth rate is almost zero. Is the
> > understanding right?
> >   2.  Compared with “scaling out”, “scaling in” is usually more dangerous
> > as it is more likely to lead to negative influence to the downstream
> jobs.
> > The min/max load bounds should be useful. I am wondering if it is
> possible
> > to have different strategy for “scaling in” to make it more conservative.
> > Or more eagerly, allow custom autoscaling strategy(e.g. time-based
> > strategy).
> >
> > Another side thought is that to recover a job from checkpoint/savepoint,
> > the new parallelism cannot be larger than max parallelism defined in the
> > checkpoint(see this<
> >
> https://github.com/apache/flink/blob/17a782c202c93343b8884cb52f4562f9c4ba593f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L128
> >).
> > Not sure if this limit should be mentioned in the FLIP.
> >
> > Again, thanks for the great work and looking forward to using flink k8s
> > operator with it!
> >
> > Best,
> > Biao Geng
> >
> > From: Maximilian Michels 
> > Date: Saturday, November 5, 2022 at 2:37 AM
> > To: dev 
> > Cc: Gyula Fóra , Thomas Weise ,
> > Marton Balassi , Őrhidi Mátyás <
> > matyas.orh...@gmail.com>
> > Subject: [DISCUSS] FLIP-271: Autoscaling
> > Hi,
> >
> > I would like to kick off the discussion on implementing autoscaling for
> > Flink as part of the Flink Kubernetes operator. I've outlined an approach
> > here which I find promising:
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling
> >
> > I've been discussing this approach with some of the operator
> contributors:
> > Gyula, Marton, Matyas, and Thomas (all in CC). We started prototyping an
> > implementation based on the current FLIP design. If that goes well, we
> > would like to contribute this to Flink based on the results of the
> > discussion here.
> >
> > I'm curious to hear your thoughts.
> >
> > -Max
> >
>


[jira] [Created] (FLINK-29906) Address tabs/spaces in checkstyle/spotless

2022-11-05 Thread Danny Cranmer (Jira)
Danny Cranmer created FLINK-29906:
-

 Summary: Address tabs/spaces in checkstyle/spotless
 Key: FLINK-29906
 URL: https://issues.apache.org/jira/browse/FLINK-29906
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / AWS
Reporter: Danny Cranmer


The DynamoDB connector has a mix of tabs and spaces, the quality config does 
not seem to be enforcing anything. The code style guide says we should [use 
spaces|https://flink.apache.org/contributing/code-style-and-quality-formatting.html#whitespaces].

- Update code to use spaces
- Fix quality plugin to reject tabs



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29907) Externalize AWS connectors from Flink core

2022-11-05 Thread Danny Cranmer (Jira)
Danny Cranmer created FLINK-29907:
-

 Summary: Externalize AWS connectors from Flink core
 Key: FLINK-29907
 URL: https://issues.apache.org/jira/browse/FLINK-29907
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / AWS
Reporter: Danny Cranmer
 Fix For: aws-connector-2.0.0


Externlize the following modules from Flink core to the connectors repo:
- {{flink-connector-aws-base}}
- {{flink-connector-kinesis}}
- {{flink-connector-sql-kinesis}}
- {{flink-connector-aws-kinesis-streams}}
- {{flink-connector-sql-aws-kinesis-streams}}
- {{flink-connector-aws-kinesis-firehose}}
- {{flink-connector-sql-aws-kinesis-firehose}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29908) Externalize and configure E2E tests

2022-11-05 Thread Danny Cranmer (Jira)
Danny Cranmer created FLINK-29908:
-

 Summary: Externalize and configure E2E tests
 Key: FLINK-29908
 URL: https://issues.apache.org/jira/browse/FLINK-29908
 Project: Flink
  Issue Type: Sub-task
Reporter: Danny Cranmer


Migrate Amazon Kinesis and Firehose E2E test modules from Flink core to 
flink-connector-aws



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-05 Thread Pedro Silva
Hello,

First of all thank you for tackling this theme, it is massive boon to Flink if 
it gets in.

Following up on JunRui Lee’s question. 
Have you considered making metrics collection getting triggered based on events 
rather than periodic checks?

I.e if input source lag is increasing for the past x amount of time -> trigger 
a metric collection to understand what to scale, if anything.

For kubernetes loads there is KEDA that does this: 
https://keda.sh/docs/2.8/scalers/prometheus/

My apologies if the question doesn’t make sense.

Thank you for your time,
Pedro Silva

> 
> On 5 Nov 2022, at 08:09, JunRui Lee  wrote:
> 
> Hi Max,
> 
> Thanks for writing this FLIP and initiating the discussion.
> 
> I just have a small question after reading the FLIP:
> 
> In the document, I didn't find the definition of when to trigger
> autoScaling after some jobVertex reach the threshold. If I missed is,
> please let me know.
> IIUC, the proper triggering rules are necessary to avoid unnecessary
> autoscaling caused by temporary large changes in data,
> and in this case, it will lead to at least two meaningless resubmissions of
> jobs, which will negatively affect users.
> 
> Thanks,
> JunRui Lee
> 
> Gyula Fóra  于2022年11月5日周六 20:38写道:
> 
>> Hey!
>> Thanks for the input!
>> The algorithm does not really differentiate between scaling up or down as
>> it’s concerned about finding the right parallelism to match the target
>> processing rate with just enough spare capacity.
>> Let me try to address your specific points:
>> 1. The backlog growth rate only matters for computing the target processing
>> rate for the sources. If the parallelism is high enough and there is no
>> back pressure it will be close to 0 so the target rate is the source read
>> rate. This is as intended. If we see that the sources are not busy and they
>> can read more than enough the algorithm would scale them down.
>> 2. You are right , it’s dangerous to scale in too much, so we already
>> thought about limiting the scale down amount per scaling step/time window
>> to give more safety. But we can definitely think about different strategies
>> in the future!
>> The observation regarding max parallelism is very important and we should
>> always take that into consideration.
>> Cheers
>> Gyula
>>> On Sat, 5 Nov 2022 at 11:46, Biao Geng  wrote:
>>> Hi Max,
>>> Thanks a lot for the FLIP. It is an extremely attractive feature!
>>> Just some follow up questions/thoughts after reading the FLIP:
>>> In the doc, the discussion of  the strategy of “scaling out” is thorough
>>> and convincing to me but it seems that “scaling down” is less discussed.
>> I
>>> have 2 cents for this aspect:
>>> 1.  For source parallelisms, if the user configure a much larger value
>>> than normal, there should be very little pending records though it is
>>> possible to get optimized. But IIUC, in current algorithm, we will not
>> take
>>> actions for this case as the backlog growth rate is almost zero. Is the
>>> understanding right?
>>> 2.  Compared with “scaling out”, “scaling in” is usually more dangerous
>>> as it is more likely to lead to negative influence to the downstream
>> jobs.
>>> The min/max load bounds should be useful. I am wondering if it is
>> possible
>>> to have different strategy for “scaling in” to make it more conservative.
>>> Or more eagerly, allow custom autoscaling strategy(e.g. time-based
>>> strategy).
>>> Another side thought is that to recover a job from checkpoint/savepoint,
>>> the new parallelism cannot be larger than max parallelism defined in the
>>> checkpoint(see this<
>> https://github.com/apache/flink/blob/17a782c202c93343b8884cb52f4562f9c4ba593f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L128
>>> ).
>>> Not sure if this limit should be mentioned in the FLIP.
>>> Again, thanks for the great work and looking forward to using flink k8s
>>> operator with it!
>>> Best,
>>> Biao Geng
>>> From: Maximilian Michels 
>>> Date: Saturday, November 5, 2022 at 2:37 AM
>>> To: dev 
>>> Cc: Gyula Fóra , Thomas Weise ,
>>> Marton Balassi , Őrhidi Mátyás <
>>> matyas.orh...@gmail.com>
>>> Subject: [DISCUSS] FLIP-271: Autoscaling
>>> Hi,
>>> I would like to kick off the discussion on implementing autoscaling for
>>> Flink as part of the Flink Kubernetes operator. I've outlined an approach
>>> here which I find promising:
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling
>>> I've been discussing this approach with some of the operator
>> contributors:
>>> Gyula, Marton, Matyas, and Thomas (all in CC). We started prototyping an
>>> implementation based on the current FLIP design. If that goes well, we
>>> would like to contribute this to Flink based on the results of the
>>> discussion here.
>>> I'm curious to hear your thoughts.
>>> -Max


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-05 Thread Gyula Fóra
@JunRui:
There are 2 pieces that prevent scaling on minor load variations. Firstly
he algorithm / logic is intended to work on metrics averaged on a
configured time window (let's say last 5 minutes), this smoothens minor
variances and results in more stability. Secondly, in addition to the
utilization target (let's say 75%) there would be a configurable flexible
bound (for example +/- 10%) and as long as we are within the bound we
wouldn't trigger scaling. These 2 together should be enough we believe.

@Pedro:
Periodic metric collection is very important to get a good overview of the
system at any given time. Also we need some recent history to make a good
scaling decision. Reflecting on your suggestion, when the input lag
increases we are already too late to scale. Ideally the algorithm would
pick up on a gradual load increase before it even results in actual input
lag so we can always keep our target utilization level.

Cheers,
Gyula



On Sat, Nov 5, 2022 at 5:16 PM Pedro Silva  wrote:

> Hello,
>
> First of all thank you for tackling this theme, it is massive boon to
> Flink if it gets in.
>
> Following up on JunRui Lee’s question.
> Have you considered making metrics collection getting triggered based on
> events rather than periodic checks?
>
> I.e if input source lag is increasing for the past x amount of time ->
> trigger a metric collection to understand what to scale, if anything.
>
> For kubernetes loads there is KEDA that does this:
> https://keda.sh/docs/2.8/scalers/prometheus/
>
> My apologies if the question doesn’t make sense.
>
> Thank you for your time,
> Pedro Silva
>
> >
> > On 5 Nov 2022, at 08:09, JunRui Lee  wrote:
> >
> > Hi Max,
> >
> > Thanks for writing this FLIP and initiating the discussion.
> >
> > I just have a small question after reading the FLIP:
> >
> > In the document, I didn't find the definition of when to trigger
> > autoScaling after some jobVertex reach the threshold. If I missed is,
> > please let me know.
> > IIUC, the proper triggering rules are necessary to avoid unnecessary
> > autoscaling caused by temporary large changes in data,
> > and in this case, it will lead to at least two meaningless resubmissions
> of
> > jobs, which will negatively affect users.
> >
> > Thanks,
> > JunRui Lee
> >
> > Gyula Fóra  于2022年11月5日周六 20:38写道:
> >
> >> Hey!
> >> Thanks for the input!
> >> The algorithm does not really differentiate between scaling up or down
> as
> >> it’s concerned about finding the right parallelism to match the target
> >> processing rate with just enough spare capacity.
> >> Let me try to address your specific points:
> >> 1. The backlog growth rate only matters for computing the target
> processing
> >> rate for the sources. If the parallelism is high enough and there is no
> >> back pressure it will be close to 0 so the target rate is the source
> read
> >> rate. This is as intended. If we see that the sources are not busy and
> they
> >> can read more than enough the algorithm would scale them down.
> >> 2. You are right , it’s dangerous to scale in too much, so we already
> >> thought about limiting the scale down amount per scaling step/time
> window
> >> to give more safety. But we can definitely think about different
> strategies
> >> in the future!
> >> The observation regarding max parallelism is very important and we
> should
> >> always take that into consideration.
> >> Cheers
> >> Gyula
> >>> On Sat, 5 Nov 2022 at 11:46, Biao Geng  wrote:
> >>> Hi Max,
> >>> Thanks a lot for the FLIP. It is an extremely attractive feature!
> >>> Just some follow up questions/thoughts after reading the FLIP:
> >>> In the doc, the discussion of  the strategy of “scaling out” is
> thorough
> >>> and convincing to me but it seems that “scaling down” is less
> discussed.
> >> I
> >>> have 2 cents for this aspect:
> >>> 1.  For source parallelisms, if the user configure a much larger value
> >>> than normal, there should be very little pending records though it is
> >>> possible to get optimized. But IIUC, in current algorithm, we will not
> >> take
> >>> actions for this case as the backlog growth rate is almost zero. Is the
> >>> understanding right?
> >>> 2.  Compared with “scaling out”, “scaling in” is usually more dangerous
> >>> as it is more likely to lead to negative influence to the downstream
> >> jobs.
> >>> The min/max load bounds should be useful. I am wondering if it is
> >> possible
> >>> to have different strategy for “scaling in” to make it more
> conservative.
> >>> Or more eagerly, allow custom autoscaling strategy(e.g. time-based
> >>> strategy).
> >>> Another side thought is that to recover a job from
> checkpoint/savepoint,
> >>> the new parallelism cannot be larger than max parallelism defined in
> the
> >>> checkpoint(see this<
> >>
> https://github.com/apache/flink/blob/17a782c202c93343b8884cb52f4562f9c4ba593f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L128
> >>> ).
> >>> Not sure if this limit should

[jira] [Created] (FLINK-29909) Standardise connector package names

2022-11-05 Thread Hong Liang Teoh (Jira)
Hong Liang Teoh created FLINK-29909:
---

 Summary: Standardise connector package names
 Key: FLINK-29909
 URL: https://issues.apache.org/jira/browse/FLINK-29909
 Project: Flink
  Issue Type: Sub-task
Reporter: Hong Liang Teoh






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-05 Thread Pedro Silva
@Guyla,

Thank you for your reply, the answer makes perfect sense. I have a follow up if 
that’s ok. 

IIUC this FLIP uses metrics that relate to backpressure at an operator level 
(records in vs out, busy time etc…).
Could the FLIP also be used to auto-scale based on state-level metrics at an 
operator level? 

I.e: keyed state growing too large for a given operator and needing to be 
distributed across more instances of that operator.

Cheers,
Pedro

> 
> On 5 Nov 2022, at 09:47, Gyula Fóra  wrote:
> 
> @JunRui:
> There are 2 pieces that prevent scaling on minor load variations. Firstly
> he algorithm / logic is intended to work on metrics averaged on a
> configured time window (let's say last 5 minutes), this smoothens minor
> variances and results in more stability. Secondly, in addition to the
> utilization target (let's say 75%) there would be a configurable flexible
> bound (for example +/- 10%) and as long as we are within the bound we
> wouldn't trigger scaling. These 2 together should be enough we believe.
> 
> @Pedro:
> Periodic metric collection is very important to get a good overview of the
> system at any given time. Also we need some recent history to make a good
> scaling decision. Reflecting on your suggestion, when the input lag
> increases we are already too late to scale. Ideally the algorithm would
> pick up on a gradual load increase before it even results in actual input
> lag so we can always keep our target utilization level.
> 
> Cheers,
> Gyula
> 
> 
> 
>> On Sat, Nov 5, 2022 at 5:16 PM Pedro Silva  wrote:
>> 
>> Hello,
>> 
>> First of all thank you for tackling this theme, it is massive boon to
>> Flink if it gets in.
>> 
>> Following up on JunRui Lee’s question.
>> Have you considered making metrics collection getting triggered based on
>> events rather than periodic checks?
>> 
>> I.e if input source lag is increasing for the past x amount of time ->
>> trigger a metric collection to understand what to scale, if anything.
>> 
>> For kubernetes loads there is KEDA that does this:
>> https://keda.sh/docs/2.8/scalers/prometheus/
>> 
>> My apologies if the question doesn’t make sense.
>> 
>> Thank you for your time,
>> Pedro Silva
>> 
>>> 
 On 5 Nov 2022, at 08:09, JunRui Lee  wrote:
>>> 
>>> Hi Max,
>>> 
>>> Thanks for writing this FLIP and initiating the discussion.
>>> 
>>> I just have a small question after reading the FLIP:
>>> 
>>> In the document, I didn't find the definition of when to trigger
>>> autoScaling after some jobVertex reach the threshold. If I missed is,
>>> please let me know.
>>> IIUC, the proper triggering rules are necessary to avoid unnecessary
>>> autoscaling caused by temporary large changes in data,
>>> and in this case, it will lead to at least two meaningless resubmissions
>> of
>>> jobs, which will negatively affect users.
>>> 
>>> Thanks,
>>> JunRui Lee
>>> 
>>> Gyula Fóra  于2022年11月5日周六 20:38写道:
>>> 
 Hey!
 Thanks for the input!
 The algorithm does not really differentiate between scaling up or down
>> as
 it’s concerned about finding the right parallelism to match the target
 processing rate with just enough spare capacity.
 Let me try to address your specific points:
 1. The backlog growth rate only matters for computing the target
>> processing
 rate for the sources. If the parallelism is high enough and there is no
 back pressure it will be close to 0 so the target rate is the source
>> read
 rate. This is as intended. If we see that the sources are not busy and
>> they
 can read more than enough the algorithm would scale them down.
 2. You are right , it’s dangerous to scale in too much, so we already
 thought about limiting the scale down amount per scaling step/time
>> window
 to give more safety. But we can definitely think about different
>> strategies
 in the future!
 The observation regarding max parallelism is very important and we
>> should
 always take that into consideration.
 Cheers
 Gyula
> On Sat, 5 Nov 2022 at 11:46, Biao Geng  wrote:
> Hi Max,
> Thanks a lot for the FLIP. It is an extremely attractive feature!
> Just some follow up questions/thoughts after reading the FLIP:
> In the doc, the discussion of  the strategy of “scaling out” is
>> thorough
> and convincing to me but it seems that “scaling down” is less
>> discussed.
 I
> have 2 cents for this aspect:
> 1.  For source parallelisms, if the user configure a much larger value
> than normal, there should be very little pending records though it is
> possible to get optimized. But IIUC, in current algorithm, we will not
 take
> actions for this case as the backlog growth rate is almost zero. Is the
> understanding right?
> 2.  Compared with “scaling out”, “scaling in” is usually more dangerous
> as it is more likely to lead to negative influence to the downstream
 jobs.
> The min/max load bounds should be useful. I am won

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-05 Thread Zheng Yu Chen
Hi Max
Thank you for dirver this flip,I have some advice for this flip

Do we not only exist in the (on/off) switch, but also have one more option
for (advcie).
After the user opens (advcie), it does not actually perform AutoScaling. It
only outputs the notification form of tuning suggestions for the user's
reference. It is up to the user to decide whether to trigger the adjustment
of the parallelism.I believe that this function is very useful in the
debugging phase or the observation phase. When the user observes a certain
period of time, he thinks it is feasible and then turns on the switch.

at the same time, I found that FFA 2020 Netflix has a related topic
discussing the automatic tuning function
Attach the video address: Autoscaling Flink at Netflix - Timothy Farkas
 This may be helpful for us to
complete this function

Here is a description of using some prediction functions to predict the
operator traffic of this job. Can we provide some interfaces for users to
customize and implement some tuning algorithms?


Maximilian Michels  于2022年11月5日周六 02:37写道:

> Hi,
>
> I would like to kick off the discussion on implementing autoscaling for
> Flink as part of the Flink Kubernetes operator. I've outlined an approach
> here which I find promising:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling
>
> I've been discussing this approach with some of the operator contributors:
> Gyula, Marton, Matyas, and Thomas (all in CC). We started prototyping an
> implementation based on the current FLIP design. If that goes well, we
> would like to contribute this to Flink based on the results of the
> discussion here.
>
> I'm curious to hear your thoughts.
>
> -Max
>


-- 
Best

ConradJam