[PR] [FLINK-35868][mongo] Add MongoDB 6.0 & 7.0 tests [flink-cdc]

2024-07-23 Thread via GitHub


yuxiqian opened a new pull request, #3489:
URL: https://github.com/apache/flink-cdc/pull/3489

   This closes FLINK-35868.
   
   It allows Mongo CDC test cases running on 6.0.16 & 7.0.12 (and legacy 5.0.2).
   
   Notice: since JUnit doesn't allow parameterized `@ClassRule` or static 
fields, so containers must be created / destroyed before / after each test 
case, which significantly slows down the. testing (~50 min.). More discussion 
is required on this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35868][mongo] Add MongoDB 6.0 & 7.0 tests [flink-cdc]

2024-07-23 Thread via GitHub


yuxiqian commented on PR #3489:
URL: https://github.com/apache/flink-cdc/pull/3489#issuecomment-2244493624

   @leonardBang @Jiabao-Sun PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35868) Bump Mongo driver version to support Mongo 7.0+

2024-07-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35868:
---
Labels: pull-request-available  (was: )

> Bump Mongo driver version to support Mongo 7.0+
> ---
>
> Key: FLINK-35868
> URL: https://issues.apache.org/jira/browse/FLINK-35868
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: yux
>Priority: Major
>  Labels: pull-request-available
>
> Currently, MongoDB CDC connector depends on mongodb-driver v4.9.1, which 
> doesn't support Mongo Server 7.0+[1]. Upgrading dependency version would be 
> nice since Mongo 7.0 has been released nearly a year ago.
> [1] https://www.mongodb.com/docs/drivers/java/sync/current/compatibility/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35876) mysql cdc may change from snapshot mode to binlog mode failed when cpu load of mysql server is 100%

2024-07-23 Thread Youle (Jira)
Youle created FLINK-35876:
-

 Summary: mysql cdc may change from snapshot mode to binlog mode 
failed when cpu load of mysql server is 100%
 Key: FLINK-35876
 URL: https://issues.apache.org/jira/browse/FLINK-35876
 Project: Flink
  Issue Type: Bug
Reporter: Youle






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35876) mysql cdc may change from snapshot mode to binlog mode failed when cpu load of mysql server is 100%

2024-07-23 Thread Youle (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Youle updated FLINK-35876:
--
 Attachment: image-2024-07-23-15-47-10-376.png
Description: 
I use mysql cdc to read data from mysql to write to doris, mysql cdc  may 
change from sanpshot mode to binlog mode failed when I set up 2 flink job(20 
parallism of source) and the cpu load of mysql server is 100%. It can't change 
even the cpu load drop and just hang up, won't consume latest binlog data. But 
when I only set up one flink job(20 parallism of source)  and the cpu load of 
mysql server is less than 100%, that mistake never happens.

Is there anything to do to avoid this mistake if we can't promise the cpu load 
always in a healthy state? 

!image-2024-07-23-15-47-10-376.png!

> mysql cdc may change from snapshot mode to binlog mode failed when cpu load 
> of mysql server is 100%
> ---
>
> Key: FLINK-35876
> URL: https://issues.apache.org/jira/browse/FLINK-35876
> Project: Flink
>  Issue Type: Bug
>Reporter: Youle
>Priority: Major
> Attachments: image-2024-07-23-15-47-10-376.png
>
>
> I use mysql cdc to read data from mysql to write to doris, mysql cdc  may 
> change from sanpshot mode to binlog mode failed when I set up 2 flink job(20 
> parallism of source) and the cpu load of mysql server is 100%. It can't 
> change even the cpu load drop and just hang up, won't consume latest binlog 
> data. But when I only set up one flink job(20 parallism of source)  and the 
> cpu load of mysql server is less than 100%, that mistake never happens.
> Is there anything to do to avoid this mistake if we can't promise the cpu 
> load always in a healthy state? 
> !image-2024-07-23-15-47-10-376.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35876) mysql cdc may change from snapshot mode to binlog mode failed when cpu load of mysql server is 100%

2024-07-23 Thread Youle (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Youle updated FLINK-35876:
--
Description: 
I use mysql cdc to read data from mysql to write to doris, mysql cdc  may 
change from sanpshot mode to binlog mode failed when I set up 2 flink job(20 
parallism of source) and the cpu load of mysql server is 100%. It can't change 
even the cpu load drop and just hang up, won't consume latest binlog data and 
there is no exception information in logs. But when I only set up one flink 
job(20 parallism of source)  and the cpu load of mysql server is less than 
100%, that mistake never happens.

Is there anything to do to avoid this mistake if we can't promise the cpu load 
always in a healthy state? 

!image-2024-07-23-15-47-10-376.png!

  was:
I use mysql cdc to read data from mysql to write to doris, mysql cdc  may 
change from sanpshot mode to binlog mode failed when I set up 2 flink job(20 
parallism of source) and the cpu load of mysql server is 100%. It can't change 
even the cpu load drop and just hang up, won't consume latest binlog data. But 
when I only set up one flink job(20 parallism of source)  and the cpu load of 
mysql server is less than 100%, that mistake never happens.

Is there anything to do to avoid this mistake if we can't promise the cpu load 
always in a healthy state? 

!image-2024-07-23-15-47-10-376.png!


> mysql cdc may change from snapshot mode to binlog mode failed when cpu load 
> of mysql server is 100%
> ---
>
> Key: FLINK-35876
> URL: https://issues.apache.org/jira/browse/FLINK-35876
> Project: Flink
>  Issue Type: Bug
>Reporter: Youle
>Priority: Major
> Attachments: image-2024-07-23-15-47-10-376.png
>
>
> I use mysql cdc to read data from mysql to write to doris, mysql cdc  may 
> change from sanpshot mode to binlog mode failed when I set up 2 flink job(20 
> parallism of source) and the cpu load of mysql server is 100%. It can't 
> change even the cpu load drop and just hang up, won't consume latest binlog 
> data and there is no exception information in logs. But when I only set up 
> one flink job(20 parallism of source)  and the cpu load of mysql server is 
> less than 100%, that mistake never happens.
> Is there anything to do to avoid this mistake if we can't promise the cpu 
> load always in a healthy state? 
> !image-2024-07-23-15-47-10-376.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35876) mysql cdc may change from snapshot mode to binlog mode failed when cpu load of mysql server is 100%

2024-07-23 Thread Youle (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Youle updated FLINK-35876:
--
Description: 
I use mysql cdc to read data from mysql to write to doris, mysql cdc  may 
change from sanpshot mode to binlog mode failed when I set up 2 flink job(20 
parallism of source) and the cpu load of mysql server is 100%. It can't change 
even the cpu load drop and just hang up, won't consume latest binlog data and 
there is no exception information in logs. But when I only set up one flink 
job(20 parallism of source)  and the cpu load of mysql server is less than 
100%, that mistake never happens.

mysql table: about 250 million data
flink version: 1.18.1
mysql cdc connector version: 3.1.0
doris connector version :1.18-1.6.2

 

Is there anything to do to avoid this mistake if we can't promise the cpu load 
always in a healthy state? 

!image-2024-07-23-15-47-10-376.png!

  was:
I use mysql cdc to read data from mysql to write to doris, mysql cdc  may 
change from sanpshot mode to binlog mode failed when I set up 2 flink job(20 
parallism of source) and the cpu load of mysql server is 100%. It can't change 
even the cpu load drop and just hang up, won't consume latest binlog data and 
there is no exception information in logs. But when I only set up one flink 
job(20 parallism of source)  and the cpu load of mysql server is less than 
100%, that mistake never happens.

Is there anything to do to avoid this mistake if we can't promise the cpu load 
always in a healthy state? 

!image-2024-07-23-15-47-10-376.png!


> mysql cdc may change from snapshot mode to binlog mode failed when cpu load 
> of mysql server is 100%
> ---
>
> Key: FLINK-35876
> URL: https://issues.apache.org/jira/browse/FLINK-35876
> Project: Flink
>  Issue Type: Bug
>Reporter: Youle
>Priority: Major
> Attachments: image-2024-07-23-15-47-10-376.png
>
>
> I use mysql cdc to read data from mysql to write to doris, mysql cdc  may 
> change from sanpshot mode to binlog mode failed when I set up 2 flink job(20 
> parallism of source) and the cpu load of mysql server is 100%. It can't 
> change even the cpu load drop and just hang up, won't consume latest binlog 
> data and there is no exception information in logs. But when I only set up 
> one flink job(20 parallism of source)  and the cpu load of mysql server is 
> less than 100%, that mistake never happens.
> mysql table: about 250 million data
> flink version: 1.18.1
> mysql cdc connector version: 3.1.0
> doris connector version :1.18-1.6.2
>  
> Is there anything to do to avoid this mistake if we can't promise the cpu 
> load always in a healthy state? 
> !image-2024-07-23-15-47-10-376.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35877) Shade protobuf in flink

2024-07-23 Thread zhuanshenbsj1 (Jira)
zhuanshenbsj1 created FLINK-35877:
-

 Summary: Shade protobuf in flink
 Key: FLINK-35877
 URL: https://issues.apache.org/jira/browse/FLINK-35877
 Project: Flink
  Issue Type: Improvement
  Components: BuildSystem / Shaded
Affects Versions: 1.19.2
Reporter: zhuanshenbsj1
 Fix For: 1.19.2


Shade the classes in protobuf to avoid class conflict.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35876) mysql cdc may change from snapshot mode to binlog mode failed when cpu load of mysql server is 100%

2024-07-23 Thread Youle (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Youle updated FLINK-35876:
--
  Component/s: Flink CDC
Affects Version/s: cdc-3.1.0

> mysql cdc may change from snapshot mode to binlog mode failed when cpu load 
> of mysql server is 100%
> ---
>
> Key: FLINK-35876
> URL: https://issues.apache.org/jira/browse/FLINK-35876
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Youle
>Priority: Major
> Attachments: image-2024-07-23-15-47-10-376.png
>
>
> I use mysql cdc to read data from mysql to write to doris, mysql cdc  may 
> change from sanpshot mode to binlog mode failed when I set up 2 flink job(20 
> parallism of source) and the cpu load of mysql server is 100%. It can't 
> change even the cpu load drop and just hang up, won't consume latest binlog 
> data and there is no exception information in logs. But when I only set up 
> one flink job(20 parallism of source)  and the cpu load of mysql server is 
> less than 100%, that mistake never happens.
> mysql table: about 250 million data
> flink version: 1.18.1
> mysql cdc connector version: 3.1.0
> doris connector version :1.18-1.6.2
>  
> Is there anything to do to avoid this mistake if we can't promise the cpu 
> load always in a healthy state? 
> !image-2024-07-23-15-47-10-376.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35877] Shade protobuf in flink [flink]

2024-07-23 Thread via GitHub


zhuanshenbsj1 opened a new pull request, #25112:
URL: https://github.com/apache/flink/pull/25112

   
   
   ## What is the purpose of the change
   
   Shade the classes in protobuf to avoid class conflict.
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35877) Shade protobuf in flink

2024-07-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35877:
---
Labels: pull-request-available  (was: )

> Shade protobuf in flink
> ---
>
> Key: FLINK-35877
> URL: https://issues.apache.org/jira/browse/FLINK-35877
> Project: Flink
>  Issue Type: Improvement
>  Components: BuildSystem / Shaded
>Affects Versions: 1.19.2
>Reporter: zhuanshenbsj1
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.19.2
>
>
> Shade the classes in protobuf to avoid class conflict.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35880) CLONE - [Release-1.20] Stage source and binary releases on dist.apache.org

2024-07-23 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-35880:
--

 Summary: CLONE - [Release-1.20] Stage source and binary releases 
on dist.apache.org
 Key: FLINK-35880
 URL: https://issues.apache.org/jira/browse/FLINK-35880
 Project: Flink
  Issue Type: Sub-task
Affects Versions: 1.20.0
Reporter: Weijie Guo
Assignee: Weijie Guo
 Fix For: 1.20.0


Copy the source release to the dev repository of dist.apache.org:
# If you have not already, check out the Flink section of the dev repository on 
dist.apache.org via Subversion. In a fresh directory:
{code:bash}
$ svn checkout https://dist.apache.org/repos/dist/dev/flink --depth=immediates
{code}
# Make a directory for the new release and copy all the artifacts (Flink 
source/binary distributions, hashes, GPG signatures and the python 
subdirectory) into that newly created directory:
{code:bash}
$ mkdir flink/flink-${RELEASE_VERSION}-rc${RC_NUM}
$ mv /tools/releasing/release/* 
flink/flink-${RELEASE_VERSION}-rc${RC_NUM}
{code}
# Add and commit all the files.
{code:bash}
$ cd flink
flink $ svn add flink-${RELEASE_VERSION}-rc${RC_NUM}
flink $ svn commit -m "Add flink-${RELEASE_VERSION}-rc${RC_NUM}"
{code}
# Verify that files are present under 
[https://dist.apache.org/repos/dist/dev/flink|https://dist.apache.org/repos/dist/dev/flink].
# Push the release tag if not done already (the following command assumes to be 
called from within the apache/flink checkout):
{code:bash}
$ git push  refs/tags/release-${RELEASE_VERSION}-rc${RC_NUM}
{code}

 

h3. Expectations
 * Maven artifacts deployed to the staging repository of 
[repository.apache.org|https://repository.apache.org/content/repositories/]
 * Source distribution deployed to the dev repository of 
[dist.apache.org|https://dist.apache.org/repos/dist/dev/flink/]
 * Check hashes (e.g. shasum -c *.sha512)
 * Check signatures (e.g. {{{}gpg --verify 
flink-1.2.3-source-release.tar.gz.asc flink-1.2.3-source-release.tar.gz{}}})
 * {{grep}} for legal headers in each file.
 * If time allows check the NOTICE files of the modules whose dependencies have 
been changed in this release in advance, since the license issues from time to 
time pop up during voting. See [Verifying a Flink 
Release|https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Release]
 "Checking License" section.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35881) [1.20-rc2] Propose a pull request for website updates

2024-07-23 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35881:
---
Summary: [1.20-rc2] Propose a pull request for website updates  (was: CLONE 
- [Release-1.20] Propose a pull request for website updates)

> [1.20-rc2] Propose a pull request for website updates
> -
>
> Key: FLINK-35881
> URL: https://issues.apache.org/jira/browse/FLINK-35881
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.17.0
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> The final step of building the candidate is to propose a website pull request 
> containing the following changes:
>  # update 
> [apache/flink-web:_config.yml|https://github.com/apache/flink-web/blob/asf-site/_config.yml]
>  ## update {{FLINK_VERSION_STABLE}} and {{FLINK_VERSION_STABLE_SHORT}} as 
> required
>  ## update version references in quickstarts ({{{}q/{}}} directory) as 
> required
>  ## (major only) add a new entry to {{flink_releases}} for the release 
> binaries and sources
>  ## (minor only) update the entry for the previous release in the series in 
> {{flink_releases}}
>  ### Please pay notice to the ids assigned to the download entries. They 
> should be unique and reflect their corresponding version number.
>  ## add a new entry to {{release_archive.flink}}
>  # add a blog post announcing the release in _posts
>  # add a organized release notes page under docs/content/release-notes and 
> docs/content.zh/release-notes (like 
> [https://nightlies.apache.org/flink/flink-docs-release-1.15/release-notes/flink-1.15/]).
>  The page is based on the non-empty release notes collected from the issues, 
> and only the issues that affect existing users should be included (e.g., 
> instead of new functionality). It should be in a separate PR since it would 
> be merged to the flink project.
> (!) Don’t merge the PRs before finalizing the release.
>  
> 
> h3. Expectations
>  * Website pull request proposed to list the 
> [release|http://flink.apache.org/downloads.html]
>  * (major only) Check {{docs/config.toml}} to ensure that
>  ** the version constants refer to the new version
>  ** the {{baseurl}} does not point to {{flink-docs-master}}  but 
> {{flink-docs-release-X.Y}} instead



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35881) CLONE - [Release-1.20] Propose a pull request for website updates

2024-07-23 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-35881:
--

 Summary: CLONE - [Release-1.20] Propose a pull request for website 
updates
 Key: FLINK-35881
 URL: https://issues.apache.org/jira/browse/FLINK-35881
 Project: Flink
  Issue Type: Sub-task
Affects Versions: 1.17.0
Reporter: Weijie Guo
Assignee: Weijie Guo
 Fix For: 1.20.0


The final step of building the candidate is to propose a website pull request 
containing the following changes:
 # update 
[apache/flink-web:_config.yml|https://github.com/apache/flink-web/blob/asf-site/_config.yml]
 ## update {{FLINK_VERSION_STABLE}} and {{FLINK_VERSION_STABLE_SHORT}} as 
required
 ## update version references in quickstarts ({{{}q/{}}} directory) as required
 ## (major only) add a new entry to {{flink_releases}} for the release binaries 
and sources
 ## (minor only) update the entry for the previous release in the series in 
{{flink_releases}}
 ### Please pay notice to the ids assigned to the download entries. They should 
be unique and reflect their corresponding version number.
 ## add a new entry to {{release_archive.flink}}
 # add a blog post announcing the release in _posts
 # add a organized release notes page under docs/content/release-notes and 
docs/content.zh/release-notes (like 
[https://nightlies.apache.org/flink/flink-docs-release-1.15/release-notes/flink-1.15/]).
 The page is based on the non-empty release notes collected from the issues, 
and only the issues that affect existing users should be included (e.g., 
instead of new functionality). It should be in a separate PR since it would be 
merged to the flink project.

(!) Don’t merge the PRs before finalizing the release.

 

h3. Expectations
 * Website pull request proposed to list the 
[release|http://flink.apache.org/downloads.html]
 * (major only) Check {{docs/config.toml}} to ensure that
 ** the version constants refer to the new version
 ** the {{baseurl}} does not point to {{flink-docs-master}}  but 
{{flink-docs-release-X.Y}} instead



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35878) Build Release Candidate: 1.20.0-rc2

2024-07-23 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-35878:
--

 Summary: Build Release Candidate: 1.20.0-rc2
 Key: FLINK-35878
 URL: https://issues.apache.org/jira/browse/FLINK-35878
 Project: Flink
  Issue Type: New Feature
Affects Versions: 1.20.0
Reporter: Weijie Guo
Assignee: Weijie Guo
 Fix For: 1.20.0


The core of the release process is the build-vote-fix cycle. Each cycle 
produces one release candidate. The Release Manager repeats this cycle until 
the community approves one release candidate, which is then finalized.

h4. Prerequisites
Set up a few environment variables to simplify Maven commands that follow. This 
identifies the release candidate being built. Start with {{RC_NUM}} equal to 1 
and increment it for each candidate:
{code}
RC_NUM="1"
TAG="release-${RELEASE_VERSION}-rc${RC_NUM}"
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35879) CLONE - [Release-1.20] Build and stage Java and Python artifacts

2024-07-23 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-35879:
--

 Summary: CLONE - [Release-1.20] Build and stage Java and Python 
artifacts
 Key: FLINK-35879
 URL: https://issues.apache.org/jira/browse/FLINK-35879
 Project: Flink
  Issue Type: Sub-task
Affects Versions: 1.20.0
Reporter: Weijie Guo
Assignee: Weijie Guo
 Fix For: 1.20.0


# Create a local release branch ((!) this step can not be skipped for minor 
releases):
{code:bash}
$ cd ./tools
tools/ $ OLD_VERSION=$CURRENT_SNAPSHOT_VERSION NEW_VERSION=$RELEASE_VERSION 
RELEASE_CANDIDATE=$RC_NUM releasing/create_release_branch.sh
{code}
 # Tag the release commit:
{code:bash}
$ git tag -s ${TAG} -m "${TAG}"
{code}
 # We now need to do several things:
 ## Create the source release archive
 ## Deploy jar artefacts to the [Apache Nexus 
Repository|https://repository.apache.org/], which is the staging area for 
deploying the jars to Maven Central
 ## Build PyFlink wheel packages
You might want to create a directory on your local machine for collecting the 
various source and binary releases before uploading them. Creating the binary 
releases is a lengthy process but you can do this on another machine (for 
example, in the "cloud"). When doing this, you can skip signing the release 
files on the remote machine, download them to your local machine and sign them 
there.
 # Build the source release:
{code:bash}
tools $ RELEASE_VERSION=$RELEASE_VERSION releasing/create_source_release.sh
{code}
 # Stage the maven artifacts:
{code:bash}
tools $ releasing/deploy_staging_jars.sh
{code}
Review all staged artifacts ([https://repository.apache.org/]). They should 
contain all relevant parts for each module, including pom.xml, jar, test jar, 
source, test source, javadoc, etc. Carefully review any new artifacts.
 # Close the staging repository on Apache Nexus. When prompted for a 
description, enter “Apache Flink, version X, release candidate Y”.
Then, you need to build the PyFlink wheel packages (since 1.11):
 # Set up an azure pipeline in your own Azure account. You can refer to [Azure 
Pipelines|https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository]
 for more details on how to set up azure pipeline for a fork of the Flink 
repository. Note that a google cloud mirror in Europe is used for downloading 
maven artifacts, therefore it is recommended to set your [Azure organization 
region|https://docs.microsoft.com/en-us/azure/devops/organizations/accounts/change-organization-location]
 to Europe to speed up the downloads.
 # Push the release candidate branch to your forked personal Flink repository, 
e.g.
{code:bash}
tools $ git push  
refs/heads/release-${RELEASE_VERSION}-rc${RC_NUM}:release-${RELEASE_VERSION}-rc${RC_NUM}
{code}
 # Trigger the Azure Pipelines manually to build the PyFlink wheel packages
 ## Go to your Azure Pipelines Flink project → Pipelines
 ## Click the "New pipeline" button on the top right
 ## Select "GitHub" → your GitHub Flink repository → "Existing Azure Pipelines 
YAML file"
 ## Select your branch → Set path to "/azure-pipelines.yaml" → click on 
"Continue" → click on "Variables"
 ## Then click "New Variable" button, fill the name with "MODE", and the value 
with "release". Click "OK" to set the variable and the "Save" button to save 
the variables, then back on the "Review your pipeline" screen click "Run" to 
trigger the build.
 ## You should now see a build where only the "CI build (release)" is running
 # Download the PyFlink wheel packages from the build result page after the 
jobs of "build_wheels mac" and "build_wheels linux" have finished.
 ## Download the PyFlink wheel packages
 ### Open the build result page of the pipeline
 ### Go to the {{Artifacts}} page (build_wheels linux -> 1 artifact)
 ### Click {{wheel_Darwin_build_wheels mac}} and {{wheel_Linux_build_wheels 
linux}} separately to download the zip files
 ## Unzip these two zip files
{code:bash}
$ cd /path/to/downloaded_wheel_packages
$ unzip wheel_Linux_build_wheels\ linux.zip
$ unzip wheel_Darwin_build_wheels\ mac.zip{code}
 ## Create directory {{./dist}} under the directory of {{{}flink-python{}}}:
{code:bash}
$ cd 
$ mkdir flink-python/dist{code}
 ## Move the unzipped wheel packages to the directory of 
{{{}flink-python/dist{}}}:
{code:java}
$ mv /path/to/wheel_Darwin_build_wheels\ mac/* flink-python/dist/
$ mv /path/to/wheel_Linux_build_wheels\ linux/* flink-python/dist/
$ cd tools{code}

Finally, we create the binary convenience release files:
{code:bash}
tools $ RELEASE_VERSION=$RELEASE_VERSION releasing/create_binary_release.sh
{code}
If you want to run this step in parallel on a remote machine you have to make 
the release commit available there (for example by pushing to a repository). 
*This is important: the commit inside the binary builds has to match the commit 
of the so

[jira] [Created] (FLINK-35882) CLONE - [Release-1.20] Vote on the release candidate

2024-07-23 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-35882:
--

 Summary: CLONE - [Release-1.20] Vote on the release candidate
 Key: FLINK-35882
 URL: https://issues.apache.org/jira/browse/FLINK-35882
 Project: Flink
  Issue Type: Sub-task
Affects Versions: 1.17.0
Reporter: Weijie Guo
Assignee: Weijie Guo
 Fix For: 1.17.0


Once you have built and individually reviewed the release candidate, please 
share it for the community-wide review. Please review foundation-wide [voting 
guidelines|http://www.apache.org/foundation/voting.html] for more information.

Start the review-and-vote thread on the dev@ mailing list. Here’s an email 
template; please adjust as you see fit.
{quote}From: Release Manager
To: d...@flink.apache.org
Subject: [VOTE] Release 1.2.3, release candidate #3

Hi everyone,
Please review and vote on the release candidate #3 for the version 1.2.3, as 
follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

The complete staging area is available for your review, which includes:
 * JIRA release notes [1],
 * the official Apache source release and binary convenience releases to be 
deployed to dist.apache.org [2], which are signed with the key with fingerprint 
 [3],
 * all artifacts to be deployed to the Maven Central Repository [4],
 * source code tag "release-1.2.3-rc3" [5],
 * website pull request listing the new release and adding announcement blog 
post [6].

The vote will be open for at least 72 hours. It is adopted by majority 
approval, with at least 3 PMC affirmative votes.

Thanks,
Release Manager

[1] link
[2] link
[3] [https://dist.apache.org/repos/dist/release/flink/KEYS]
[4] link
[5] link
[6] link
{quote}
*If there are any issues found in the release candidate, reply on the vote 
thread to cancel the vote.* There’s no need to wait 72 hours. Proceed to the 
Fix Issues step below and address the problem. However, some issues don’t 
require cancellation. For example, if an issue is found in the website pull 
request, just correct it on the spot and the vote can continue as-is.

For cancelling a release, the release manager needs to send an email to the 
release candidate thread, stating that the release candidate is officially 
cancelled. Next, all artifacts created specifically for the RC in the previous 
steps need to be removed:
 * Delete the staging repository in Nexus
 * Remove the source / binary RC files from dist.apache.org
 * Delete the source code tag in git

*If there are no issues, reply on the vote thread to close the voting.* Then, 
tally the votes in a separate email. Here’s an email template; please adjust as 
you see fit.
{quote}From: Release Manager
To: d...@flink.apache.org
Subject: [RESULT] [VOTE] Release 1.2.3, release candidate #3

I'm happy to announce that we have unanimously approved this release.

There are XXX approving votes, XXX of which are binding:
 * approver 1
 * approver 2
 * approver 3
 * approver 4

There are no disapproving votes.

Thanks everyone!
{quote}
 

h3. Expectations
 * Community votes to release the proposed candidate, with at least three 
approving PMC votes

Any issues that are raised till the vote is over should be either resolved or 
moved into the next release (if applicable).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35880) [1.20-rc2] Stage source and binary releases on dist.apache.org

2024-07-23 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35880:
---
Summary: [1.20-rc2] Stage source and binary releases on dist.apache.org  
(was: CLONE - [Release-1.20] Stage source and binary releases on 
dist.apache.org)

> [1.20-rc2] Stage source and binary releases on dist.apache.org
> --
>
> Key: FLINK-35880
> URL: https://issues.apache.org/jira/browse/FLINK-35880
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
> Fix For: 1.20.0
>
>
> Copy the source release to the dev repository of dist.apache.org:
> # If you have not already, check out the Flink section of the dev repository 
> on dist.apache.org via Subversion. In a fresh directory:
> {code:bash}
> $ svn checkout https://dist.apache.org/repos/dist/dev/flink --depth=immediates
> {code}
> # Make a directory for the new release and copy all the artifacts (Flink 
> source/binary distributions, hashes, GPG signatures and the python 
> subdirectory) into that newly created directory:
> {code:bash}
> $ mkdir flink/flink-${RELEASE_VERSION}-rc${RC_NUM}
> $ mv /tools/releasing/release/* 
> flink/flink-${RELEASE_VERSION}-rc${RC_NUM}
> {code}
> # Add and commit all the files.
> {code:bash}
> $ cd flink
> flink $ svn add flink-${RELEASE_VERSION}-rc${RC_NUM}
> flink $ svn commit -m "Add flink-${RELEASE_VERSION}-rc${RC_NUM}"
> {code}
> # Verify that files are present under 
> [https://dist.apache.org/repos/dist/dev/flink|https://dist.apache.org/repos/dist/dev/flink].
> # Push the release tag if not done already (the following command assumes to 
> be called from within the apache/flink checkout):
> {code:bash}
> $ git push  refs/tags/release-${RELEASE_VERSION}-rc${RC_NUM}
> {code}
>  
> 
> h3. Expectations
>  * Maven artifacts deployed to the staging repository of 
> [repository.apache.org|https://repository.apache.org/content/repositories/]
>  * Source distribution deployed to the dev repository of 
> [dist.apache.org|https://dist.apache.org/repos/dist/dev/flink/]
>  * Check hashes (e.g. shasum -c *.sha512)
>  * Check signatures (e.g. {{{}gpg --verify 
> flink-1.2.3-source-release.tar.gz.asc flink-1.2.3-source-release.tar.gz{}}})
>  * {{grep}} for legal headers in each file.
>  * If time allows check the NOTICE files of the modules whose dependencies 
> have been changed in this release in advance, since the license issues from 
> time to time pop up during voting. See [Verifying a Flink 
> Release|https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Release]
>  "Checking License" section.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35881) [1.20-rc2] Propose a pull request for website updates

2024-07-23 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35881:
---
Affects Version/s: 1.20.0
   (was: 1.17.0)

> [1.20-rc2] Propose a pull request for website updates
> -
>
> Key: FLINK-35881
> URL: https://issues.apache.org/jira/browse/FLINK-35881
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> The final step of building the candidate is to propose a website pull request 
> containing the following changes:
>  # update 
> [apache/flink-web:_config.yml|https://github.com/apache/flink-web/blob/asf-site/_config.yml]
>  ## update {{FLINK_VERSION_STABLE}} and {{FLINK_VERSION_STABLE_SHORT}} as 
> required
>  ## update version references in quickstarts ({{{}q/{}}} directory) as 
> required
>  ## (major only) add a new entry to {{flink_releases}} for the release 
> binaries and sources
>  ## (minor only) update the entry for the previous release in the series in 
> {{flink_releases}}
>  ### Please pay notice to the ids assigned to the download entries. They 
> should be unique and reflect their corresponding version number.
>  ## add a new entry to {{release_archive.flink}}
>  # add a blog post announcing the release in _posts
>  # add a organized release notes page under docs/content/release-notes and 
> docs/content.zh/release-notes (like 
> [https://nightlies.apache.org/flink/flink-docs-release-1.15/release-notes/flink-1.15/]).
>  The page is based on the non-empty release notes collected from the issues, 
> and only the issues that affect existing users should be included (e.g., 
> instead of new functionality). It should be in a separate PR since it would 
> be merged to the flink project.
> (!) Don’t merge the PRs before finalizing the release.
>  
> 
> h3. Expectations
>  * Website pull request proposed to list the 
> [release|http://flink.apache.org/downloads.html]
>  * (major only) Check {{docs/config.toml}} to ensure that
>  ** the version constants refer to the new version
>  ** the {{baseurl}} does not point to {{flink-docs-master}}  but 
> {{flink-docs-release-X.Y}} instead



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35879) [1.20-rc2] Build and stage Java and Python artifacts

2024-07-23 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35879:
---
Summary: [1.20-rc2] Build and stage Java and Python artifacts  (was: CLONE 
- [Release-1.20] Build and stage Java and Python artifacts)

> [1.20-rc2] Build and stage Java and Python artifacts
> 
>
> Key: FLINK-35879
> URL: https://issues.apache.org/jira/browse/FLINK-35879
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
> Fix For: 1.20.0
>
>
> # Create a local release branch ((!) this step can not be skipped for minor 
> releases):
> {code:bash}
> $ cd ./tools
> tools/ $ OLD_VERSION=$CURRENT_SNAPSHOT_VERSION NEW_VERSION=$RELEASE_VERSION 
> RELEASE_CANDIDATE=$RC_NUM releasing/create_release_branch.sh
> {code}
>  # Tag the release commit:
> {code:bash}
> $ git tag -s ${TAG} -m "${TAG}"
> {code}
>  # We now need to do several things:
>  ## Create the source release archive
>  ## Deploy jar artefacts to the [Apache Nexus 
> Repository|https://repository.apache.org/], which is the staging area for 
> deploying the jars to Maven Central
>  ## Build PyFlink wheel packages
> You might want to create a directory on your local machine for collecting the 
> various source and binary releases before uploading them. Creating the binary 
> releases is a lengthy process but you can do this on another machine (for 
> example, in the "cloud"). When doing this, you can skip signing the release 
> files on the remote machine, download them to your local machine and sign 
> them there.
>  # Build the source release:
> {code:bash}
> tools $ RELEASE_VERSION=$RELEASE_VERSION releasing/create_source_release.sh
> {code}
>  # Stage the maven artifacts:
> {code:bash}
> tools $ releasing/deploy_staging_jars.sh
> {code}
> Review all staged artifacts ([https://repository.apache.org/]). They should 
> contain all relevant parts for each module, including pom.xml, jar, test jar, 
> source, test source, javadoc, etc. Carefully review any new artifacts.
>  # Close the staging repository on Apache Nexus. When prompted for a 
> description, enter “Apache Flink, version X, release candidate Y”.
> Then, you need to build the PyFlink wheel packages (since 1.11):
>  # Set up an azure pipeline in your own Azure account. You can refer to 
> [Azure 
> Pipelines|https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository]
>  for more details on how to set up azure pipeline for a fork of the Flink 
> repository. Note that a google cloud mirror in Europe is used for downloading 
> maven artifacts, therefore it is recommended to set your [Azure organization 
> region|https://docs.microsoft.com/en-us/azure/devops/organizations/accounts/change-organization-location]
>  to Europe to speed up the downloads.
>  # Push the release candidate branch to your forked personal Flink 
> repository, e.g.
> {code:bash}
> tools $ git push  
> refs/heads/release-${RELEASE_VERSION}-rc${RC_NUM}:release-${RELEASE_VERSION}-rc${RC_NUM}
> {code}
>  # Trigger the Azure Pipelines manually to build the PyFlink wheel packages
>  ## Go to your Azure Pipelines Flink project → Pipelines
>  ## Click the "New pipeline" button on the top right
>  ## Select "GitHub" → your GitHub Flink repository → "Existing Azure 
> Pipelines YAML file"
>  ## Select your branch → Set path to "/azure-pipelines.yaml" → click on 
> "Continue" → click on "Variables"
>  ## Then click "New Variable" button, fill the name with "MODE", and the 
> value with "release". Click "OK" to set the variable and the "Save" button to 
> save the variables, then back on the "Review your pipeline" screen click 
> "Run" to trigger the build.
>  ## You should now see a build where only the "CI build (release)" is running
>  # Download the PyFlink wheel packages from the build result page after the 
> jobs of "build_wheels mac" and "build_wheels linux" have finished.
>  ## Download the PyFlink wheel packages
>  ### Open the build result page of the pipeline
>  ### Go to the {{Artifacts}} page (build_wheels linux -> 1 artifact)
>  ### Click {{wheel_Darwin_build_wheels mac}} and {{wheel_Linux_build_wheels 
> linux}} separately to download the zip files
>  ## Unzip these two zip files
> {code:bash}
> $ cd /path/to/downloaded_wheel_packages
> $ unzip wheel_Linux_build_wheels\ linux.zip
> $ unzip wheel_Darwin_build_wheels\ mac.zip{code}
>  ## Create directory {{./dist}} under the directory of {{{}flink-python{}}}:
> {code:bash}
> $ cd 
> $ mkdir flink-python/dist{code}
>  ## Move the unzipped wheel packages to the directory of 
> {{{}flink-python/dist{}}}:
> {code:java}
> $ mv /path/to/wheel_Darwin_build_wheels\ mac/* flink-

[jira] [Updated] (FLINK-35882) [1.20-rc2] Vote on the release candidate

2024-07-23 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35882:
---
Summary: [1.20-rc2] Vote on the release candidate  (was: CLONE - 
[Release-1.20] Vote on the release candidate)

> [1.20-rc2] Vote on the release candidate
> 
>
> Key: FLINK-35882
> URL: https://issues.apache.org/jira/browse/FLINK-35882
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.17.0
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
> Fix For: 1.17.0
>
>
> Once you have built and individually reviewed the release candidate, please 
> share it for the community-wide review. Please review foundation-wide [voting 
> guidelines|http://www.apache.org/foundation/voting.html] for more information.
> Start the review-and-vote thread on the dev@ mailing list. Here’s an email 
> template; please adjust as you see fit.
> {quote}From: Release Manager
> To: d...@flink.apache.org
> Subject: [VOTE] Release 1.2.3, release candidate #3
> Hi everyone,
> Please review and vote on the release candidate #3 for the version 1.2.3, as 
> follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
> The complete staging area is available for your review, which includes:
>  * JIRA release notes [1],
>  * the official Apache source release and binary convenience releases to be 
> deployed to dist.apache.org [2], which are signed with the key with 
> fingerprint  [3],
>  * all artifacts to be deployed to the Maven Central Repository [4],
>  * source code tag "release-1.2.3-rc3" [5],
>  * website pull request listing the new release and adding announcement blog 
> post [6].
> The vote will be open for at least 72 hours. It is adopted by majority 
> approval, with at least 3 PMC affirmative votes.
> Thanks,
> Release Manager
> [1] link
> [2] link
> [3] [https://dist.apache.org/repos/dist/release/flink/KEYS]
> [4] link
> [5] link
> [6] link
> {quote}
> *If there are any issues found in the release candidate, reply on the vote 
> thread to cancel the vote.* There’s no need to wait 72 hours. Proceed to the 
> Fix Issues step below and address the problem. However, some issues don’t 
> require cancellation. For example, if an issue is found in the website pull 
> request, just correct it on the spot and the vote can continue as-is.
> For cancelling a release, the release manager needs to send an email to the 
> release candidate thread, stating that the release candidate is officially 
> cancelled. Next, all artifacts created specifically for the RC in the 
> previous steps need to be removed:
>  * Delete the staging repository in Nexus
>  * Remove the source / binary RC files from dist.apache.org
>  * Delete the source code tag in git
> *If there are no issues, reply on the vote thread to close the voting.* Then, 
> tally the votes in a separate email. Here’s an email template; please adjust 
> as you see fit.
> {quote}From: Release Manager
> To: d...@flink.apache.org
> Subject: [RESULT] [VOTE] Release 1.2.3, release candidate #3
> I'm happy to announce that we have unanimously approved this release.
> There are XXX approving votes, XXX of which are binding:
>  * approver 1
>  * approver 2
>  * approver 3
>  * approver 4
> There are no disapproving votes.
> Thanks everyone!
> {quote}
>  
> 
> h3. Expectations
>  * Community votes to release the proposed candidate, with at least three 
> approving PMC votes
> Any issues that are raised till the vote is over should be either resolved or 
> moved into the next release (if applicable).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35882) [1.20-rc2] Vote on the release candidate

2024-07-23 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35882:
---
Affects Version/s: 1.20.0
   (was: 1.17.0)

> [1.20-rc2] Vote on the release candidate
> 
>
> Key: FLINK-35882
> URL: https://issues.apache.org/jira/browse/FLINK-35882
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
> Fix For: 1.17.0
>
>
> Once you have built and individually reviewed the release candidate, please 
> share it for the community-wide review. Please review foundation-wide [voting 
> guidelines|http://www.apache.org/foundation/voting.html] for more information.
> Start the review-and-vote thread on the dev@ mailing list. Here’s an email 
> template; please adjust as you see fit.
> {quote}From: Release Manager
> To: d...@flink.apache.org
> Subject: [VOTE] Release 1.2.3, release candidate #3
> Hi everyone,
> Please review and vote on the release candidate #3 for the version 1.2.3, as 
> follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
> The complete staging area is available for your review, which includes:
>  * JIRA release notes [1],
>  * the official Apache source release and binary convenience releases to be 
> deployed to dist.apache.org [2], which are signed with the key with 
> fingerprint  [3],
>  * all artifacts to be deployed to the Maven Central Repository [4],
>  * source code tag "release-1.2.3-rc3" [5],
>  * website pull request listing the new release and adding announcement blog 
> post [6].
> The vote will be open for at least 72 hours. It is adopted by majority 
> approval, with at least 3 PMC affirmative votes.
> Thanks,
> Release Manager
> [1] link
> [2] link
> [3] [https://dist.apache.org/repos/dist/release/flink/KEYS]
> [4] link
> [5] link
> [6] link
> {quote}
> *If there are any issues found in the release candidate, reply on the vote 
> thread to cancel the vote.* There’s no need to wait 72 hours. Proceed to the 
> Fix Issues step below and address the problem. However, some issues don’t 
> require cancellation. For example, if an issue is found in the website pull 
> request, just correct it on the spot and the vote can continue as-is.
> For cancelling a release, the release manager needs to send an email to the 
> release candidate thread, stating that the release candidate is officially 
> cancelled. Next, all artifacts created specifically for the RC in the 
> previous steps need to be removed:
>  * Delete the staging repository in Nexus
>  * Remove the source / binary RC files from dist.apache.org
>  * Delete the source code tag in git
> *If there are no issues, reply on the vote thread to close the voting.* Then, 
> tally the votes in a separate email. Here’s an email template; please adjust 
> as you see fit.
> {quote}From: Release Manager
> To: d...@flink.apache.org
> Subject: [RESULT] [VOTE] Release 1.2.3, release candidate #3
> I'm happy to announce that we have unanimously approved this release.
> There are XXX approving votes, XXX of which are binding:
>  * approver 1
>  * approver 2
>  * approver 3
>  * approver 4
> There are no disapproving votes.
> Thanks everyone!
> {quote}
>  
> 
> h3. Expectations
>  * Community votes to release the proposed candidate, with at least three 
> approving PMC votes
> Any issues that are raised till the vote is over should be either resolved or 
> moved into the next release (if applicable).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-23 Thread via GitHub


hlteoh37 commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1687672586


##
flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilderTest.java:
##
@@ -25,14 +25,6 @@
 /** Covers construction, defaults and sanity checking of {@link 
SqsSinkBuilder}. */
 class SqsSinkBuilderTest {
 
-@Test
-void elementConverterOfSinkMustBeSetWhenBuilt() {
-Assertions.assertThatExceptionOfType(NullPointerException.class)
-.isThrownBy(() -> 
SqsSink.builder().setSqsUrl("sqlUrl").build())
-.withMessageContaining(
-"No SerializationSchema was supplied to the SQS Sink 
builder.");
-}
-

Review Comment:
   Why did we remove this unit test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35877] Shade protobuf in flink [flink]

2024-07-23 Thread via GitHub


flinkbot commented on PR #25112:
URL: https://github.com/apache/flink/pull/25112#issuecomment-2244623653

   
   ## CI report:
   
   * 8a98ca6989b8a1192f72c4d0fd58529e99f9bfd5 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35737] Prevent Memory Leak by Closing MemoryExecutionGraphInfoStore on MiniCluster Shutdown [flink]

2024-07-23 Thread via GitHub


fengjiajie commented on PR #25009:
URL: https://github.com/apache/flink/pull/25009#issuecomment-2244648203

   Hi @Samrat002, I was wondering if you had a chance to take another look at 
this PR. Please let me know if any further changes are needed. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-35868) Bump Mongo driver version to support Mongo 7.0+

2024-07-23 Thread Jiabao Sun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiabao Sun reassigned FLINK-35868:
--

Assignee: yux

> Bump Mongo driver version to support Mongo 7.0+
> ---
>
> Key: FLINK-35868
> URL: https://issues.apache.org/jira/browse/FLINK-35868
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: yux
>Assignee: yux
>Priority: Major
>  Labels: pull-request-available
>
> Currently, MongoDB CDC connector depends on mongodb-driver v4.9.1, which 
> doesn't support Mongo Server 7.0+[1]. Upgrading dependency version would be 
> nice since Mongo 7.0 has been released nearly a year ago.
> [1] https://www.mongodb.com/docs/drivers/java/sync/current/compatibility/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35623] Bump mongo-driver version from 4.7.2 to 5.1.1 to support MongoDB 7.0 [flink-connector-mongodb]

2024-07-23 Thread via GitHub


Jiabao-Sun commented on PR #36:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/36#issuecomment-2244667146

   Hi @yux, could you help review this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35868][cdc-connector][mongodb] Add MongoDB 6.0 & 7.0 tests [flink-cdc]

2024-07-23 Thread via GitHub


Jiabao-Sun commented on PR #3489:
URL: https://github.com/apache/flink-cdc/pull/3489#issuecomment-2244672599

   Shall we bump the driver version from 4.7.1 to 5.1.1 as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35877) Shade protobuf in flink

2024-07-23 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868001#comment-17868001
 ] 

Martijn Visser commented on FLINK-35877:


This is tagged for Shaded, but it's not in 
https://github.com/apache/flink-shaded so this isn't the right component

> Shade protobuf in flink
> ---
>
> Key: FLINK-35877
> URL: https://issues.apache.org/jira/browse/FLINK-35877
> Project: Flink
>  Issue Type: Improvement
>  Components: BuildSystem / Shaded
>Affects Versions: 1.19.2
>Reporter: zhuanshenbsj1
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.19.2
>
>
> Shade the classes in protobuf to avoid class conflict.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35868][cdc-connector][mongodb] Bump dependency version to support MongoDB 7.0 [flink-cdc]

2024-07-23 Thread via GitHub


yuxiqian commented on PR #3489:
URL: https://github.com/apache/flink-cdc/pull/3489#issuecomment-2244705856

   > Shall we bump the driver version from 4.7.1 to 5.1.1 as well?
   
   Done, bumped `mongo-kafka` version, too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35623] Bump mongo-driver version from 4.7.2 to 5.1.1 to support MongoDB 7.0 [flink-connector-mongodb]

2024-07-23 Thread via GitHub


Jiabao-Sun merged PR #36:
URL: https://github.com/apache/flink-connector-mongodb/pull/36


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-35623) Bump mongo-driver version from 4.7.2 to 5.1.1 to support MongoDB 7.0

2024-07-23 Thread Jiabao Sun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiabao Sun resolved FLINK-35623.

Resolution: Implemented

main: a7551187d904ed819db085fc36c2cf735913ed5e

> Bump mongo-driver version from 4.7.2 to 5.1.1 to support MongoDB 7.0
> 
>
> Key: FLINK-35623
> URL: https://issues.apache.org/jira/browse/FLINK-35623
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / MongoDB
>Affects Versions: mongodb-1.2.0
>Reporter: Jiabao Sun
>Assignee: Jiabao Sun
>Priority: Major
>  Labels: pull-request-available
> Fix For: mongodb-1.3.0
>
>
> Bump mongo-driver version from 4.7.2 to 5.1.1 to support MongoDB 7.0
>  
> [https://www.mongodb.com/docs/drivers/java/sync/current/compatibility/]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2024-07-23 Thread via GitHub


1996fanrui commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1687687114


##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategyTest.java:
##
@@ -69,46 +69,29 @@ class LocalInputPreferredSlotSharingStrategyTest {
 private TestingSchedulingExecutionVertex ev21;
 private TestingSchedulingExecutionVertex ev22;
 
-private Set slotSharingGroups;
-
-@BeforeEach
-void setUp() {
-topology = new TestingSchedulingTopology();
-
+private void setupCase() {
 ev11 = topology.newExecutionVertex(JOB_VERTEX_ID_1, 0);
 ev12 = topology.newExecutionVertex(JOB_VERTEX_ID_1, 1);
 
 ev21 = topology.newExecutionVertex(JOB_VERTEX_ID_2, 0);
 ev22 = topology.newExecutionVertex(JOB_VERTEX_ID_2, 1);
 
-final SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
-slotSharingGroup.addVertexToGroup(JOB_VERTEX_ID_1);
-slotSharingGroup.addVertexToGroup(JOB_VERTEX_ID_2);
-slotSharingGroups = Collections.singleton(slotSharingGroup);
+slotsharingGroup.addVertexToGroup(JOB_VERTEX_ID_1);
+slotsharingGroup.addVertexToGroup(JOB_VERTEX_ID_2);
 }
 
-@Test
-void testCoLocationConstraintIsRespected() {
-topology.connect(ev11, ev22);
-topology.connect(ev12, ev21);
-
-final CoLocationGroup coLocationGroup =
-new TestingCoLocationGroup(JOB_VERTEX_ID_1, JOB_VERTEX_ID_2);
-final Set coLocationGroups = 
Collections.singleton(coLocationGroup);
-
-final SlotSharingStrategy strategy =
-new LocalInputPreferredSlotSharingStrategy(
-topology, slotSharingGroups, coLocationGroups);
-
-assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(2);
-
assertThat(strategy.getExecutionSlotSharingGroup(ev11.getId()).getExecutionVertexIds())
-.contains(ev11.getId(), ev21.getId());
-
assertThat(strategy.getExecutionSlotSharingGroup(ev12.getId()).getExecutionVertexIds())
-.contains(ev12.getId(), ev22.getId());
+@Override
+protected SlotSharingStrategy getSlotSharingStrategy(
+SchedulingTopology topology,
+Set slotSharingGroups,
+Set coLocationGroups) {
+return new LocalInputPreferredSlotSharingStrategy(
+topology, slotSharingGroups, coLocationGroups);
 }
 
 @Test
 void testInputLocalityIsRespectedWithRescaleEdge() {
+setupCase();

Review Comment:
   It's a little wired to call `setupCase` in too many tests.
   
   Could we define a `protected abstract void doSetup();` in 
`AbstractSlotSharingStrategyTest` and `setUp` call `doSetup()`?



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java:
##
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This strategy tries to get a balanced tasks scheduling. Execution vertices, 
which are belong to
+ * the same SlotSharingGroup, tend to be put evenly in each 
ExecutionSlotSharingGroup. Co-location
+ * con

Re: [PR] [FLINK-35877] Shade protobuf in flink [flink]

2024-07-23 Thread via GitHub


zhuanshenbsj1 commented on PR #25112:
URL: https://github.com/apache/flink/pull/25112#issuecomment-2244784197

   > Which part of the Flink runtime/system relies on Protobuf, that would 
justify shading Protobuf?
   
   This is mainly to avoid the dependency conflict between the protobuf used in 
the developer's code and flink itself.  protobuf is very commonly used in 
development.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33977) Adaptive scheduler may not minimize the number of TMs during downscaling

2024-07-23 Thread RocMarshal (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RocMarshal updated FLINK-33977:
---
Attachment: screenshot-1.png

> Adaptive scheduler may not minimize the number of TMs during downscaling
> 
>
> Key: FLINK-33977
> URL: https://issues.apache.org/jira/browse/FLINK-33977
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Runtime / Coordination
>Affects Versions: 1.18.0, 1.19.0, 1.20.0
>Reporter: Zhanghao Chen
>Priority: Major
> Attachments: screenshot-1.png
>
>
> Adaptive Scheduler uses SlotAssigner to assign free slots to slot sharing 
> groups. Currently, there're two implementations of SlotAssigner available: 
> the 
> DefaultSlotAssigner that treats all slots and slot sharing groups equally and 
> the {color:#172b4d}StateLocalitySlotAssigner{color} that assigns slots based 
> on the number of local key groups to utilize local state recovery. The 
> scheduler will use the DefaultSlotAssigner when no key group assignment info 
> is available and use the StateLocalitySlotAssigner otherwise.
>  
> However, none of the SlotAssigner targets at minimizing the number of TMs, 
> which may produce suboptimal slot assignment under the Application Mode. For 
> example, when a job with 8 slot sharing groups and 2 TMs (each 4 slots) is 
> downscaled through the FLIP-291 API to have 4 slot sharing groups instead, 
> the cluster may still have 2 TMs, one with 1 free slot, and the other with 3 
> free slots. For end-users, this implies an ineffective downscaling as the 
> total cluster resources are not reduced.
>  
> We should take minimizing number of TMs into consideration as well. A 
> possible approach is to enhance the {color:#172b4d}StateLocalitySlotAssigner: 
> when the number of free slots exceeds need, sort all the TMs by a score 
> summing from the allocation scores of all slots on it, remove slots from the 
> excessive TMs with the lowest score and proceed the remaining slot 
> assignment.{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33977) Adaptive scheduler may not minimize the number of TMs during downscaling

2024-07-23 Thread RocMarshal (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868012#comment-17868012
 ] 

RocMarshal commented on FLINK-33977:


Thanks [~Zhanghao Chen]  for reporting it.

This issue is also very serious within our jobs. After scaling down, resources 
cannot be released.

 !screenshot-1.png! 

As shown in the figure, 6 TMs, each using only 1 slot.

BTW, is any progress here?
I'm interested in the jira, May I get the ticket ?
Thank you very much!


> Adaptive scheduler may not minimize the number of TMs during downscaling
> 
>
> Key: FLINK-33977
> URL: https://issues.apache.org/jira/browse/FLINK-33977
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Runtime / Coordination
>Affects Versions: 1.18.0, 1.19.0, 1.20.0
>Reporter: Zhanghao Chen
>Priority: Major
> Attachments: screenshot-1.png
>
>
> Adaptive Scheduler uses SlotAssigner to assign free slots to slot sharing 
> groups. Currently, there're two implementations of SlotAssigner available: 
> the 
> DefaultSlotAssigner that treats all slots and slot sharing groups equally and 
> the {color:#172b4d}StateLocalitySlotAssigner{color} that assigns slots based 
> on the number of local key groups to utilize local state recovery. The 
> scheduler will use the DefaultSlotAssigner when no key group assignment info 
> is available and use the StateLocalitySlotAssigner otherwise.
>  
> However, none of the SlotAssigner targets at minimizing the number of TMs, 
> which may produce suboptimal slot assignment under the Application Mode. For 
> example, when a job with 8 slot sharing groups and 2 TMs (each 4 slots) is 
> downscaled through the FLIP-291 API to have 4 slot sharing groups instead, 
> the cluster may still have 2 TMs, one with 1 free slot, and the other with 3 
> free slots. For end-users, this implies an ineffective downscaling as the 
> total cluster resources are not reduced.
>  
> We should take minimizing number of TMs into consideration as well. A 
> possible approach is to enhance the {color:#172b4d}StateLocalitySlotAssigner: 
> when the number of free slots exceeds need, sort all the TMs by a score 
> summing from the allocation scores of all slots on it, remove slots from the 
> excessive TMs with the lowest score and proceed the remaining slot 
> assignment.{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35874][cdc-connector][mysql] Check pureBinlogPhaseTables set before call getBinlogPosition method [flink-cdc]

2024-07-23 Thread via GitHub


qiaozongmi commented on PR #3488:
URL: https://github.com/apache/flink-cdc/pull/3488#issuecomment-2244820245

   @leonardBang PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35874) Check pureBinlogPhaseTables set before call getBinlogPosition method in BinlogSplitReader

2024-07-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35874:
---
Labels: pull-request-available  (was: )

> Check pureBinlogPhaseTables set before call getBinlogPosition method in 
> BinlogSplitReader
> -
>
> Key: FLINK-35874
> URL: https://issues.apache.org/jira/browse/FLINK-35874
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: Zhongmin Qiao
>Assignee: Zhongmin Qiao
>Priority: Minor
>  Labels: pull-request-available
> Attachments: image-2024-07-22-19-26-59-158.png, 
> image-2024-07-22-19-27-19-366.png, image-2024-07-22-19-30-08-989.png, 
> image-2024-07-22-19-36-20-481.png, image-2024-07-22-19-36-40-581.png, 
> image-2024-07-22-19-37-35-542.png, image-2024-07-22-21-12-03-316.png
>
>
> The method getBinlogPosition of RecordUtil which is called by  
> BinlogSplitReader.
> shouldEmit is a highly performance-consuming method. This is because it 
> iterates through the sourceOffset map of the SourceRecord, and during the 
> iteration, it also performs a toString() conversion on the value. Finally, it 
> calls the putAll method of BinlogOffsetBuilder to put all the elements 
> obtained from the iteration into the offsetMap (which involves another map 
> traversal and hashcode computation). Despite the significant performance 
> impact of getBinlogPosition, we still need to call it when emitting each 
> DataChangeRecord, which reduces the efficiency of data processing in Flink 
> CDC.
> !image-2024-07-22-19-26-59-158.png|width=545,height=222!
> !image-2024-07-22-19-27-19-366.png|width=545,height=119!
> However, we can optimize and avoid frequent invocations of getBinlogPosition 
> by moving the check pureBinlogPhaseTables.contains(tableId) in the 
> hasEnterPureBinlogPhase method before calling getBinlogPosition. This way, if 
> the SourceRecord belongs to a pure binlog phase table, we can directly return 
> true without the need for the highly performance-consuming getBinlogPosition 
> method.
> diff
> !image-2024-07-22-21-12-03-316.png|width=548,height=236!
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33874][runtime] Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler [flink]

2024-07-23 Thread via GitHub


RocMarshal opened a new pull request, #25113:
URL: https://github.com/apache/flink/pull/25113

   
   
   
   ## What is the purpose of the change
   
   Support resource request wait mechanism at DefaultDeclarativeSlotPool side 
for Default Scheduler
   
   
   ## Brief change log
   
   - introduce slot.request.max-interval 
   - implement the mechanism.
   
   
   ## Verifying this change
   
   
   This change is already covered by added corresponding test cases.
   
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager 
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / **docs** / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33874) Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler

2024-07-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33874:
---
Labels: pull-request-available  (was: )

> Support resource request wait mechanism at DefaultDeclarativeSlotPool side 
> for Default Scheduler
> 
>
> Key: FLINK-33874
> URL: https://issues.apache.org/jira/browse/FLINK-33874
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: RocMarshal
>Assignee: RocMarshal
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33874][runtime] Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler [flink]

2024-07-23 Thread via GitHub


flinkbot commented on PR #25113:
URL: https://github.com/apache/flink/pull/25113#issuecomment-2244834549

   
   ## CI report:
   
   * 7e539f5fc908f38eebba16f9480bcf3f5d5009c3 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35883) Wildcard projection inserts column at wrong place

2024-07-23 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868022#comment-17868022
 ] 

yux commented on FLINK-35883:
-

I'd like to investigate this based on FLINK-35272.

> Wildcard projection inserts column at wrong place
> -
>
> Key: FLINK-35883
> URL: https://issues.apache.org/jira/browse/FLINK-35883
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: yux
>Priority: Major
>
> In this case where a wildcard projection was declared:
> ```yaml
> transform:
>   - projection: \*, 'extras' AS extras
> ```
> For upstream schema [a, b, c], transform operator should send [a, b, c, 
> extras] to downstream.
> However, if another column 'd' was inserted at the end, upstream schema would 
> be [a, b, c, d], and one might expect transformed schema to be [a, b, c, d, 
> extras]. But it's [a, b, c, extras, d], since `AddColumnEvent{d, 
> position=LAST}` was applied to [a, b, c, extras] after the projection process.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35883) Wildcard projection inserts column at wrong place

2024-07-23 Thread yux (Jira)
yux created FLINK-35883:
---

 Summary: Wildcard projection inserts column at wrong place
 Key: FLINK-35883
 URL: https://issues.apache.org/jira/browse/FLINK-35883
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux


In this case where a wildcard projection was declared:
```yaml
transform:
  - projection: \*, 'extras' AS extras
```
For upstream schema [a, b, c], transform operator should send [a, b, c, extras] 
to downstream.

However, if another column 'd' was inserted at the end, upstream schema would 
be [a, b, c, d], and one might expect transformed schema to be [a, b, c, d, 
extras]. But it's [a, b, c, extras, d], since `AddColumnEvent{d, 
position=LAST}` was applied to [a, b, c, extras] after the projection process.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column [flink-cdc]

2024-07-23 Thread via GitHub


leonardBang commented on code in PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1687881675


##
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/data_types_test.sql:
##
@@ -0,0 +1,28 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--  http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+DROP TABLE IF EXISTS DATA_TYPES_TABLE;
+
+CREATE TABLE DATA_TYPES_TABLE (
+ID INT NOT NULL,
+TS DATETIME(0),
+NUM DECIMAL(10, 2),
+PRIMARY KEY (ID)
+);

Review Comment:
   The type is not so complex from my point, could you refer `fullTypesTest` of 
MySQL CDC Source?



##
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java:
##
@@ -56,6 +67,181 @@ public static List 
createFieldGetters(List colum
 return fieldGetters;
 }
 
+/** Restore original data fields from RecordData structure. */
+public static List restoreOriginalData(
+@Nullable RecordData recordData, List 
fieldGetters) {
+if (recordData == null) {
+return Collections.emptyList();
+}
+List actualFields = new ArrayList<>();
+for (RecordData.FieldGetter fieldGetter : fieldGetters) {
+actualFields.add(fieldGetter.getFieldOrNull(recordData));
+}
+return actualFields;
+}
+
+/** Merge compatible upstream schemas. */
+public static Schema mergeCompatibleSchemas(List schemas) {
+if (schemas.isEmpty()) {
+return null;
+} else if (schemas.size() == 1) {
+return schemas.get(0);
+} else {
+Schema outputSchema = null;
+for (Schema schema : schemas) {
+outputSchema = mergeSchema(outputSchema, schema);
+}
+return outputSchema;
+}
+}
+
+/** Try to combine two schemas with potential incompatible type. */
+@VisibleForTesting
+public static Schema mergeSchema(@Nullable Schema lSchema, Schema rSchema) 
{
+if (lSchema == null) {
+return rSchema;
+}
+if (lSchema.getColumnCount() != rSchema.getColumnCount()) {
+throw new IllegalStateException(
+String.format(
+"Unable to merge schema %s and %s with different 
column counts.",
+lSchema, rSchema));
+}
+if (!lSchema.primaryKeys().equals(rSchema.primaryKeys())) {
+throw new IllegalStateException(
+String.format(
+"Unable to merge schema %s and %s with different 
primary keys.",
+lSchema, rSchema));
+}
+if (!lSchema.partitionKeys().equals(rSchema.partitionKeys())) {
+throw new IllegalStateException(
+String.format(
+"Unable to merge schema %s and %s with different 
partition keys.",
+lSchema, rSchema));
+}
+if (!lSchema.options().equals(rSchema.options())) {
+throw new IllegalStateException(
+String.format(
+"Unable to merge schema %s and %s with different 
options.",
+lSchema, rSchema));
+}
+if (!Objects.equals(lSchema.comment(), rSchema.comment())) {
+throw new IllegalStateException(
+String.format(
+"Unable to merge schema %s and %s with different 
comments.",
+lSchema, rSchema));
+}
+
+List leftColumns = lSchema.getColumns();
+List rightColumns = rSchema.getColumns();
+
+List mergedColumns =
+IntStream.range(0, lSchema.getColumnCount())
+.mapToObj(i -> mergeColumn(leftColumns.get(i), 
rightColumns.get(i)))
+.collect(Collectors.toList());
+
+return lSchema.copy(mergedColumns);
+}
+
+/** Try to combine two columns with potential incompatible type. */
+@VisibleForTesting
+public static Column mergeColumn(Column lColumn, Column rColumn) {
+if (!Objects.equals(lColumn.getName(), rColumn.getName())) {
+thro

Re: [PR] [FLINK-35868][cdc-connector][mongodb] Bump dependency version to support MongoDB 7.0 [flink-cdc]

2024-07-23 Thread via GitHub


leonardBang commented on code in PR #3489:
URL: https://github.com/apache/flink-cdc/pull/3489#discussion_r1687920162


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/pom.xml:
##
@@ -69,7 +69,7 @@ limitations under the License.
 
 org.mongodb
 mongodb-driver-sync
-4.9.1
+5.1.1

Review Comment:
   Could you also update the docs ? and the older(release-3.1) version docs 
seem use an incorrect driver version.
   
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/overview/



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35884) Support

2024-07-23 Thread JunboWang (Jira)
JunboWang created FLINK-35884:
-

 Summary: Support
 Key: FLINK-35884
 URL: https://issues.apache.org/jira/browse/FLINK-35884
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: JunboWang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35884) Pipeline connector MySQL support snapshot.chunk.key-column

2024-07-23 Thread JunboWang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JunboWang updated FLINK-35884:
--
Summary: Pipeline connector MySQL support snapshot.chunk.key-column  (was: 
Support)

> Pipeline connector MySQL support snapshot.chunk.key-column
> --
>
> Key: FLINK-35884
> URL: https://issues.apache.org/jira/browse/FLINK-35884
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: JunboWang
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


ferenc-csaky commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1687799832


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##
@@ -426,6 +438,14 @@ public static String operatorConfigKey(String key) {
 .withDescription(
 "Max allowed checkpoint age for initiating 
last-state upgrades on running jobs. If a checkpoint is not available within 
the desired age (and nothing in progress) a savepoint will be triggered.");
 
+@Documentation.Section(SECTION_DYNAMIC)
+public static final ConfigOption 
OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE =
+operatorConfig("savepoint.dispose-on-delete")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"Savepoint data for FlinkStateSnapshot resources 
created by the operator during upgrades and periodic savepoints will be 
disposed of automatically when the generated Kubernetes resource is deleted.");

Review Comment:
   nit: "...savepoints will be disposed ~of~ automatically when..."



##
flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkStateSnapshotState.java:
##
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.api.status;
+
+import org.apache.flink.annotation.Experimental;
+
+/** Describes current snapshot state. */
+@Experimental
+public enum FlinkStateSnapshotState {

Review Comment:
   Maybe move this inside the `FlinkStateSnapshotStatus`, and then the enum 
name could be simply `State`. This might improve the overall readability and we 
can work around the kind of confusing wording here a bit better.
   
   I see that `ReconciliationStatus` and `ReconciliationState` follows the 
current setup and this just setup the same way, but the wording is 
straightforward there.



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##
@@ -297,6 +297,18 @@ public static String operatorConfigKey(String key) {
 "Custom HTTP header for HttpArtifactFetcher. The 
header will be applied when getting the session job artifacts. "
 + "Expected format: 
headerKey1:headerValue1,headerKey2:headerValue2.");
 
+@Documentation.Section(SECTION_DYNAMIC)
+public static final ConfigOption SNAPSHOT_RESOURCE_ENABLED =
+operatorConfig("snapshot.resource.enabled")
+.booleanType()
+.defaultValue(true)
+.withDescription(
+"Create new FlinkStateSnapshot resources for 
storing snapshots. "
++ "Disable if you wish to use the 
deprecated mode and save snapshot results to "
++ "FlinkDeployment/FlinkSessionJob status 
fields. The Operator will fallback to the "

Review Comment:
   nit: "The Operator will fallback to ~the~"



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java:
##
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+pac

[jira] [Commented] (FLINK-33977) Adaptive scheduler may not minimize the number of TMs during downscaling

2024-07-23 Thread Zhanghao Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868038#comment-17868038
 ] 

Zhanghao Chen commented on FLINK-33977:
---

Hi [~RocMarshal], there's no progress on my side. Please go ahead~

> Adaptive scheduler may not minimize the number of TMs during downscaling
> 
>
> Key: FLINK-33977
> URL: https://issues.apache.org/jira/browse/FLINK-33977
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Runtime / Coordination
>Affects Versions: 1.18.0, 1.19.0, 1.20.0
>Reporter: Zhanghao Chen
>Priority: Major
> Attachments: screenshot-1.png
>
>
> Adaptive Scheduler uses SlotAssigner to assign free slots to slot sharing 
> groups. Currently, there're two implementations of SlotAssigner available: 
> the 
> DefaultSlotAssigner that treats all slots and slot sharing groups equally and 
> the {color:#172b4d}StateLocalitySlotAssigner{color} that assigns slots based 
> on the number of local key groups to utilize local state recovery. The 
> scheduler will use the DefaultSlotAssigner when no key group assignment info 
> is available and use the StateLocalitySlotAssigner otherwise.
>  
> However, none of the SlotAssigner targets at minimizing the number of TMs, 
> which may produce suboptimal slot assignment under the Application Mode. For 
> example, when a job with 8 slot sharing groups and 2 TMs (each 4 slots) is 
> downscaled through the FLIP-291 API to have 4 slot sharing groups instead, 
> the cluster may still have 2 TMs, one with 1 free slot, and the other with 3 
> free slots. For end-users, this implies an ineffective downscaling as the 
> total cluster resources are not reduced.
>  
> We should take minimizing number of TMs into consideration as well. A 
> possible approach is to enhance the {color:#172b4d}StateLocalitySlotAssigner: 
> when the number of free slots exceeds need, sort all the TMs by a score 
> summing from the allocation scores of all slots on it, remove slots from the 
> excessive TMs with the lowest score and proceed the remaining slot 
> assignment.{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35868][cdc-connector][mongodb] Bump dependency version to support MongoDB 7.0 [flink-cdc]

2024-07-23 Thread via GitHub


yuxiqian commented on PR #3489:
URL: https://github.com/apache/flink-cdc/pull/3489#issuecomment-2245079611

   I'll investigate this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35884) Pipeline connector MySQL support snapshot.chunk.key-column

2024-07-23 Thread JunboWang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JunboWang updated FLINK-35884:
--
Description: flink-connector-mysql-cdc `MySqlSourceOptions` supports 
specifying the parameter scan.incremental.snapshot.chunk.key-column to divide 
chunks, pipeline connector should also support.

> Pipeline connector MySQL support snapshot.chunk.key-column
> --
>
> Key: FLINK-35884
> URL: https://issues.apache.org/jira/browse/FLINK-35884
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: JunboWang
>Priority: Minor
>
> flink-connector-mysql-cdc `MySqlSourceOptions` supports specifying the 
> parameter scan.incremental.snapshot.chunk.key-column to divide chunks, 
> pipeline connector should also support.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35884) Pipeline connector MySQL support scan.incremental.snapshot.chunk.key-column

2024-07-23 Thread JunboWang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JunboWang updated FLINK-35884:
--
Summary: Pipeline connector MySQL support 
scan.incremental.snapshot.chunk.key-column  (was: Pipeline connector MySQL 
support snapshot.chunk.key-column)

> Pipeline connector MySQL support scan.incremental.snapshot.chunk.key-column
> ---
>
> Key: FLINK-35884
> URL: https://issues.apache.org/jira/browse/FLINK-35884
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: JunboWang
>Priority: Minor
>
> flink-connector-mysql-cdc `MySqlSourceOptions` supports specifying the 
> parameter scan.incremental.snapshot.chunk.key-column to divide chunks, 
> pipeline connector should also support.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35884) Pipeline MySQL connector support scan.incremental.snapshot.chunk.key-column

2024-07-23 Thread JunboWang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JunboWang updated FLINK-35884:
--
Summary: Pipeline MySQL connector support 
scan.incremental.snapshot.chunk.key-column  (was: Pipeline connector MySQL 
support scan.incremental.snapshot.chunk.key-column)

> Pipeline MySQL connector support scan.incremental.snapshot.chunk.key-column
> ---
>
> Key: FLINK-35884
> URL: https://issues.apache.org/jira/browse/FLINK-35884
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: JunboWang
>Priority: Minor
>
> flink-connector-mysql-cdc `MySqlSourceOptions` supports specifying the 
> parameter scan.incremental.snapshot.chunk.key-column to divide chunks, 
> pipeline connector should also support.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35884] MySQL pipeline support snapshot chunk key-column [flink-cdc]

2024-07-23 Thread via GitHub


beryllw opened a new pull request, #3490:
URL: https://github.com/apache/flink-cdc/pull/3490

   flink-connector-mysql-cdc `MySqlSourceOptions` supports specifying the 
parameter scan.incremental.snapshot.chunk.key-column to divide chunks, pipeline 
connector should also support.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35884) Pipeline MySQL connector supports setting chunk column key

2024-07-23 Thread JunboWang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JunboWang updated FLINK-35884:
--
Summary: Pipeline MySQL connector supports setting chunk column key  (was: 
Pipeline MySQL connector support scan.incremental.snapshot.chunk.key-column)

> Pipeline MySQL connector supports setting chunk column key
> --
>
> Key: FLINK-35884
> URL: https://issues.apache.org/jira/browse/FLINK-35884
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: JunboWang
>Priority: Minor
>
> flink-connector-mysql-cdc `MySqlSourceOptions` supports specifying the 
> parameter scan.incremental.snapshot.chunk.key-column to divide chunks, 
> pipeline connector should also support.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35884) Pipeline MySQL connector supports setting chunk column key

2024-07-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35884:
---
Labels: pull-request-available  (was: )

> Pipeline MySQL connector supports setting chunk column key
> --
>
> Key: FLINK-35884
> URL: https://issues.apache.org/jira/browse/FLINK-35884
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: JunboWang
>Priority: Minor
>  Labels: pull-request-available
>
> flink-connector-mysql-cdc `MySqlSourceOptions` supports specifying the 
> parameter scan.incremental.snapshot.chunk.key-column to divide chunks, 
> pipeline connector should also support.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35242] Supports per-SE type configuration & "lenient" evolution behavior [flink-cdc]

2024-07-23 Thread via GitHub


leonardBang commented on code in PR #3339:
URL: https://github.com/apache/flink-cdc/pull/3339#discussion_r1687943729


##
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DataSink.java:
##
@@ -30,5 +33,5 @@ public interface DataSink {
 EventSinkProvider getEventSinkProvider();
 
 /** Get the {@link MetadataApplier} for applying metadata changes to 
external systems. */
-MetadataApplier getMetadataApplier();
+MetadataApplier getMetadataApplier(Set 
enabledEventTypes);

Review Comment:
   This is a public API, we should keep the backward compatibility 



##
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/MetadataApplier.java:
##
@@ -19,13 +19,22 @@
 
 import org.apache.flink.cdc.common.annotation.PublicEvolving;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.SchemaChangeEventType;
+import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
 
 import java.io.Serializable;
+import java.util.Set;
 
 /** {@code MetadataApplier} is used to apply metadata changes to external 
systems. */
 @PublicEvolving
 public interface MetadataApplier extends Serializable {
 
+/** Checks if this metadata applier should handle this event type. */
+boolean acceptsSchemaEvolutionType(SchemaChangeEventType 
schemaChangeEventType);
+
+/** Checks what kind of schema change events downstream can handle. */
+Set getSupportedSchemaEvolutionTypes();
+

Review Comment:
   Please consider the compatibility when change existing public API. Imaging 
the case that user has a custom pipeline connector, is it still work after 
he/she bump the flink cdc version?



##
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/SchemaEvolveException.java:
##
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.exceptions;
+
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+
+import javax.annotation.Nullable;
+
+/** An exception occurred during schema evolution. */
+public class SchemaEvolveException extends Exception {

Review Comment:
   extends `FlinkRuntimeException` or `CDCRuntimeException` ?



##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisDataSink.java:
##
@@ -74,7 +76,7 @@ public EventSinkProvider getEventSinkProvider() {
 }
 
 @Override
-public MetadataApplier getMetadataApplier() {
-return new DorisMetadataApplier(dorisOptions, configuration);
+public MetadataApplier getMetadataApplier(Set 
enabledEventTypes) {
+return new DorisMetadataApplier(dorisOptions, configuration, 
enabledEventTypes);
 }

Review Comment:
   You need change all connectors in one PR if you bring an incompatible change 
in public API, we should avoid this kind of change.



##
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/SchemaChangeBehavior.java:
##
@@ -22,7 +22,9 @@
 /** Behavior for handling schema changes. */
 @PublicEvolving
 public enum SchemaChangeBehavior {
-EVOLVE,
 IGNORE,

Review Comment:
   you can add a note about the order adjustment 



##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/metrics/SchemaOperatorMetrics.java:
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and

Re: [PR] [FLINK-35857] fix redeploy failed deployment without latest checkpoint [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


gyfora commented on code in PR #855:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/855#discussion_r1687973449


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##
@@ -289,6 +289,8 @@ public boolean 
reconcileOtherChanges(FlinkResourceContext ctx) throws Except
 LOG.info("Stopping failed Flink job...");
 cleanupAfterFailedJob(ctx);
 status.setError(null);
+ReconciliationUtils.updateStatusForDeployedSpec(
+ctx.getResource(), 
ctx.getDeployConfig(ctx.getResource().getSpec()), clock);

Review Comment:
   I don't really understand how this would solve the issue...
   Could you please explain why you think it solves it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-23 Thread via GitHub


hlteoh37 commented on PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2245153099

   @19priyadhingra The tests seems to be failing. Can we please take a look?
   
   Also - it would be good if we squash the commits!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35885) SlicingWindowOperator should not process watermark with proctime

2024-07-23 Thread Baozhu Zhao (Jira)
Baozhu Zhao created FLINK-35885:
---

 Summary: SlicingWindowOperator should not process watermark with 
proctime
 Key: FLINK-35885
 URL: https://issues.apache.org/jira/browse/FLINK-35885
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.17.2, 1.13.6
 Environment: flink 1.13.6 or flink 1.17.2
Reporter: Baozhu Zhao


We have discovered an unexpected case where abnormal data with a count of 0 
occurs when performing proctime window aggregation on data with a watermark, 
The SQL is as follows

{code:sql}
CREATE TABLE s1 (
id INT,
event_time TIMESTAMP(3),
name string,
proc_time AS PROCTIME (),
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
)
WITH
('connector' = 'my-source')
;

SELECT
*
FROM
(
SELECT
name,
COUNT(id) AS total_count,
window_start,
window_end
FROM
TABLE (
TUMBLE (
TABLE s1,
DESCRIPTOR (proc_time),
INTERVAL '30' SECONDS
)
)
GROUP BY
window_start,
window_end,
name
)
WHERE
total_count = 0;
{code}

For detailed test code, please refer to xxx



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35285) Autoscaler key group optimization can interfere with scale-down.max-factor

2024-07-23 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868069#comment-17868069
 ] 

Gyula Fora commented on FLINK-35285:


Hey [~trystan] sorry for the late reply. 
I see the problem...

Once you are close to the max parallelism you need higher scale down factor, 
but you generally want to keep this fairly conservative. 

One approach that we could also consider is that we always allow scaling 
up/down to the nearest valid parallelism regardless of the max scaling factors. 
I think this a bit risky and I am not too much in favour of it.

For your particular use case, it may be best to solve the issue by changing 
your max-parallelism setting from 120 to something like 720 or 1260. As long as 
your job parallelism is very small compared to the max parallelism and we have 
a lot of divisors the algorithm has a lot of flexibility even with small scale 
factors. 

In other words you could chose your max parallelism based on the desired scale 
factor. For small scale factors like 0.1 you will need a max parallelism that 
high compared to the usual parallelism. You can still set a vertex max 
parallelism in the autoscaler to avoid scaling very high in practice.

> Autoscaler key group optimization can interfere with scale-down.max-factor
> --
>
> Key: FLINK-35285
> URL: https://issues.apache.org/jira/browse/FLINK-35285
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Trystan
>Priority: Minor
>
> When setting a less aggressive scale down limit, the key group optimization 
> can prevent a vertex from scaling down at all. It will hunt from target 
> upwards to maxParallelism/2, and will always find currentParallelism again.
>  
> A simple test trying to scale down from a parallelism of 60 with a 
> scale-down.max-factor of 0.2:
> {code:java}
> assertEquals(48, JobVertexScaler.scale(60, inputShipStrategies, 360, .8, 8, 
> 360)); {code}
>  
> It seems reasonable to make a good attempt to spread data across subtasks, 
> but not at the expense of total deadlock. The problem is that during scale 
> down it doesn't actually ensure that newParallelism will be < 
> currentParallelism. The only workaround is to set a scale down factor large 
> enough such that it finds the next lowest divisor of the maxParallelism.
>  
> Clunky, but something to ensure it can make at least some progress. There is 
> another test that now fails, but just to illustrate the point:
> {code:java}
> for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) 
> {
> if ((scaleFactor < 1 && p < currentParallelism) || (scaleFactor > 1 && p 
> > currentParallelism)) {
> if (maxParallelism % p == 0) {
> return p;
> }
> }
> } {code}
>  
> Perhaps this is by design and not a bug, but total failure to scale down in 
> order to keep optimized key groups does not seem ideal.
>  
> Key group optimization block:
> [https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296C1-L303C10]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35776] Simplify job status handling [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


gyfora merged PR #851:
URL: https://github.com/apache/flink-kubernetes-operator/pull/851


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


mateczagany commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688087051


##
flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java:
##
@@ -90,8 +100,8 @@ public class JobSpec implements Diffable {
 
 /**
  * Nonce used to trigger a full redeployment of the job from the savepoint 
path specified in
- * initialSavepointPath. In order to trigger redeployment, change the 
number to a different
- * non-null value. Rollback is not possible after redeployment.
+ * initialSavepointPath or initialSavepointName. In order to trigger 
redeployment, change the

Review Comment:
   Forgot to rename it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-35776) Simplify job observe logic

2024-07-23 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-35776.
--
Resolution: Fixed

merged to main 15f648ce46537d6a1df3a87cfd653a3f855d0dcd

> Simplify job observe logic
> --
>
> Key: FLINK-35776
> URL: https://issues.apache.org/jira/browse/FLINK-35776
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.10.0
>
>
> There is a fairly complicated listing / observe logic for jobs currently that 
> is no longer necessary as we have a stable logic to always record the jobID 
> in the status before submission.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32682] Make it possible to use query time based time functions in streaming mode [flink]

2024-07-23 Thread via GitHub


dawidwys closed pull request #23083: [FLINK-32682] Make it possible to use 
query time based time functions in streaming mode
URL: https://github.com/apache/flink/pull/23083


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35885) SlicingWindowOperator should not process watermark with proctime

2024-07-23 Thread Baozhu Zhao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Baozhu Zhao updated FLINK-35885:

Description: 
We have discovered an unexpected case where abnormal data with a count of 0 
occurs when performing proctime window aggregation on data with a watermark, 
The SQL is as follows

{code:sql}
CREATE TABLE s1 (
id INT,
event_time TIMESTAMP(3),
name string,
proc_time AS PROCTIME (),
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
)
WITH
('connector' = 'my-source')
;

SELECT
*
FROM
(
SELECT
name,
COUNT(id) AS total_count,
window_start,
window_end
FROM
TABLE (
TUMBLE (
TABLE s1,
DESCRIPTOR (proc_time),
INTERVAL '30' SECONDS
)
)
GROUP BY
window_start,
window_end,
name
)
WHERE
total_count = 0;
{code}

For detailed test code, please refer to 
https://github.com/xingsuo-zbz/flink/blob/zbz/117/proc-agg-window-process-watermark-bug-test/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/bug/WindowBugTest.java

  was:
We have discovered an unexpected case where abnormal data with a count of 0 
occurs when performing proctime window aggregation on data with a watermark, 
The SQL is as follows

{code:sql}
CREATE TABLE s1 (
id INT,
event_time TIMESTAMP(3),
name string,
proc_time AS PROCTIME (),
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
)
WITH
('connector' = 'my-source')
;

SELECT
*
FROM
(
SELECT
name,
COUNT(id) AS total_count,
window_start,
window_end
FROM
TABLE (
TUMBLE (
TABLE s1,
DESCRIPTOR (proc_time),
INTERVAL '30' SECONDS
)
)
GROUP BY
window_start,
window_end,
name
)
WHERE
total_count = 0;
{code}

For detailed test code, please refer to xxx

Environment: flink 1.13.6 with blink or flink 1.17.2  (was: flink 1.13.6 or 
flink 1.17.2)

> SlicingWindowOperator should not process watermark with proctime
> 
>
> Key: FLINK-35885
> URL: https://issues.apache.org/jira/browse/FLINK-35885
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.6, 1.17.2
> Environment: flink 1.13.6 with blink or flink 1.17.2
>Reporter: Baozhu Zhao
>Priority: Major
>
> We have discovered an unexpected case where abnormal data with a count of 0 
> occurs when performing proctime window aggregation on data with a watermark, 
> The SQL is as follows
> {code:sql}
> CREATE TABLE s1 (
> id INT,
> event_time TIMESTAMP(3),
> name string,
> proc_time AS PROCTIME (),
> WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
> )
> WITH
> ('connector' = 'my-source')
> ;
> SELECT
> *
> FROM
> (
> SELECT
> name,
> COUNT(id) AS total_count,
> window_start,
> window_end
> FROM
> TABLE (
> TUMBLE (
> TABLE s1,
> DESCRIPTOR (proc_time),
> INTERVAL '30' SECONDS
> )
> )
> GROUP BY
> window_start,
> window_end,
> name
> )
> WHERE
> total_count = 0;
> {code}
> For detailed test code, please refer to 
> https://github.com/xingsuo-zbz/flink/blob/zbz/117/proc-agg-window-process-watermark-bug-test/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/bug/WindowBugTest.java



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


mateczagany commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688100790


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##
@@ -426,6 +438,14 @@ public static String operatorConfigKey(String key) {
 .withDescription(
 "Max allowed checkpoint age for initiating 
last-state upgrades on running jobs. If a checkpoint is not available within 
the desired age (and nothing in progress) a savepoint will be triggered.");
 
+@Documentation.Section(SECTION_DYNAMIC)
+public static final ConfigOption 
OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE =
+operatorConfig("savepoint.dispose-on-delete")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"Savepoint data for FlinkStateSnapshot resources 
created by the operator during upgrades and periodic savepoints will be 
disposed of automatically when the generated Kubernetes resource is deleted.");

Review Comment:
   I believe it's the correct way to say it (to dispose of something)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


mateczagany commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688104509


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotContext.java:
##
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import org.apache.flink.kubernetes.operator.api.spec.JobKind;
+import 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+import java.util.Optional;
+
+/** Context for reconciling a snapshot. */
+@Getter
+@RequiredArgsConstructor
+public class FlinkStateSnapshotContext {
+
+private final FlinkStateSnapshot resource;
+private final FlinkStateSnapshotStatus originalStatus;
+private final Context josdkContext;
+private final FlinkConfigManager configManager;
+
+private FlinkOperatorConfiguration operatorConfig;
+private Configuration referencedJobObserveConfig;
+private FlinkDeployment referencedJobFlinkDeployment;
+
+/**
+ * @return Operator configuration for this resource.
+ */
+public FlinkOperatorConfiguration getOperatorConfig() {
+if (operatorConfig != null) {
+return operatorConfig;
+}
+return operatorConfig =
+configManager.getOperatorConfiguration(
+getResource().getMetadata().getNamespace(), null);

Review Comment:
   I have flipped the ifs for now, I will come back to check using Lombok lazy 
getter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35292] Set dummy savepoint path during last-state upgrade [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


gyfora commented on code in PR #849:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/849#discussion_r1688103240


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##
@@ -265,7 +274,7 @@ protected void restoreJob(
 throws Exception {
 Optional savepointOpt = Optional.empty();
 
-if (spec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
+if (spec.getJob().getUpgradeMode() == UpgradeMode.SAVEPOINT) {

Review Comment:
   The problem is that this cannot be easily done as what savepoint is passed 
there will depend on where / why the deploy is called. It is for example 
different during initial deployments vs upgrades.
   
   While the optional parameter is not great I would like to avoid adding more 
complexity to the deploy method. It may make sense to actually break up the 
deploy method in 3 parts, stateless/last-state/savepoint which would get rid of 
the optional params + the requireHaMetadata flag at the same time at the 
expense of more methods.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-35292) Set dummy savepoint path during last-state upgrade

2024-07-23 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-35292.
--
Resolution: Fixed

merged to main 207b149f9b556d68c4ab98d16cdde2f7820659c0

> Set dummy savepoint path during last-state upgrade
> --
>
> Key: FLINK-35292
> URL: https://issues.apache.org/jira/browse/FLINK-35292
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
>
> Currently the operator always sets the savepoint path even if last-state (HA 
> metadata) must be used. 
> This can be misleading to users as the set savepoint path normally should 
> never take effect and can actually lead to incorrect state restored if the HA 
> metadata is deleted by the user at the wrong moment. 
> To avoid this we can set an explicit dummy savepoint path which will prevent 
> restoring from it accidentally. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


mateczagany commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688126716


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java:
##
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
+import 
org.apache.flink.kubernetes.operator.observer.snapshot.StateSnapshotObserver;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import 
org.apache.flink.kubernetes.operator.reconciler.snapshot.StateSnapshotReconciler;
+import 
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
+import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
+import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
+
+import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
+import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import io.javaoperatorsdk.operator.processing.event.source.EventSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/** Controller that runs the main reconcile loop for {@link 
FlinkStateSnapshot}. */
+@ControllerConfiguration
+public class FlinkStateSnapshotController
+implements Reconciler,
+ErrorStatusHandler,
+EventSourceInitializer,
+Cleaner {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FlinkStateSnapshotController.class);
+
+private final Set validators;
+private final FlinkResourceContextFactory ctxFactory;
+private final StateSnapshotReconciler reconciler;
+private final StateSnapshotObserver observer;
+private final EventRecorder eventRecorder;
+private final StatusRecorder 
statusRecorder;
+
+public FlinkStateSnapshotController(
+Set validators,
+FlinkResourceContextFactory ctxFactory,
+StateSnapshotReconciler reconciler,
+StateSnapshotObserver observer,
+EventRecorder eventRecorder,
+StatusRecorder 
statusRecorder) {
+this.validators = validators;
+this.ctxFactory = ctxFactory;
+this.reconciler = reconciler;
+this.observer = observer;
+this.eventRecorder = eventRecorder;
+this.statusRecorder = statusRecorder;
+}
+
+@Override
+public UpdateControl reconcile(
+FlinkStateSnapshot flinkStateSnapshot, Context 
josdkContext) {
+// status might be null here
+flinkStateSnapshot.setStatus(
+Objects.requireNonNullElseGet(
+flinkStateSnapshot.getStatus(), 
FlinkStateSnapshotStatus::new));
+var ctx = ctxFactory.getFlinkStateSnapshotContext(flinkStateSnapshot, 
josdkContext);
+
+observer.observe(ctx);
+
+if (validateSnapshot(ctx)) {
+reconciler.reconcile(ctx);
+}
+
+notifyListeners(ctx);
+return getUpdateControl(ctx);
+}
+
+@Override
+public DeleteControl cleanup(
+FlinkStateSnapshot flinkStateSnapshot, Context 
josdkContext) {
+var ctx 

Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


mateczagany commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688136504


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java:
##
@@ -273,15 +280,19 @@ private void disposeSavepointQuietly(
 }
 }
 
-private void observeLatestSavepoint(
+private void observeLatestCheckpoint(

Review Comment:
   Yep, this will be the latest checkpoint retrieved via Flink REST API, I have 
updated the log



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35857] fix redeploy failed deployment without latest checkpoint [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


chenyuzhi459 commented on code in PR #855:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/855#discussion_r1688138227


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##
@@ -289,6 +289,8 @@ public boolean 
reconcileOtherChanges(FlinkResourceContext ctx) throws Except
 LOG.info("Stopping failed Flink job...");
 cleanupAfterFailedJob(ctx);
 status.setError(null);
+ReconciliationUtils.updateStatusForDeployedSpec(
+ctx.getResource(), 
ctx.getDeployConfig(ctx.getResource().getSpec()), clock);

Review Comment:
   Assume a flink deployment is submitted to the flink-kubernetes-operator for 
the first time with the following settings
   
   ```
   spec.job.upgradeMode=savepoint
   spec.job.initialSavepointPath=null
   spec.flinkConfiguration.execution.checkpointing.interval=60s
   ```
   
   Then I will share the startup and failover process of 
flink-kubernetes-operator based on my understanding:
   
   1. At the first reconcile,  in method 
[AbstractFlinkResourceReconciler.updateStatusBeforeFirstDeployment](https://github.com/apache/flink-kubernetes-operator/blob/29076c80eaac5547e3d12f703c43780cd4a52dad/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L224)
 , the `spec.job.upgradeMode` of cloned deployment will be set to STATELESS 
(this will not be updated synchronously to origin deployment in k8s, which 
means that the origin deployment's spec.job.upgradeMode is still savepoint)  
because `spec.job.initialSavepointPath` is empty,  and will be serialized into 
`status.reconciliationStatus.lastReconciledSpec` (this step will be 
synchronously updated to the origin deployment in k8s, I haven't studied why 
yet will happen)
   
   
   2. After running for a period of time, the deployment  may encounters a 
problem and exit with failed status. The operator will save the latest 
checkpoint in the `status.jobStatus.savepointInfo.lastSavepoint` of the origin 
deployment in  the method `SnapshotObserver.observeSavepointStatus`.
   
   3. Then in the method 
[AbstractJobReconciler.resubmitJob](https://github.com/apache/flink-kubernetes-operator/blob/29076c80eaac5547e3d12f703c43780cd4a52dad/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java#L315),
 the lastReconciledSpec of the origin deployment will be read as specToRecover 
variable and passed to the method   
[AbstractJobReconciler.restore](https://github.com/apache/flink-kubernetes-operator/blob/29076c80eaac5547e3d12f703c43780cd4a52dad/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java#L260).
 In the method `AbstractJobReconciler.restore`, it will determine whether to 
recover from lastSavepoint based on whether the `spec.job.upgradeMode` of  
specToRecover variable is STATELESS . Before fixed, the updateMode here is 
obviously STATELESS.
   
   
   
   Therefore, in the faiover scenes, I think just serializing the origin 
deployment's `spec.job.upgradeMode=SAVEPOINT` to ` 
status.reconciliationStatus.lastReconciledSpec` before resubmitJob can solve 
this problem.
   
   I don’t know if there is something wrong with my understanding. If so, I 
hope you can correct me. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35857] fix redeploy failed deployment without latest checkpoint [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


gyfora commented on code in PR #855:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/855#discussion_r1688196270


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##
@@ -289,6 +289,8 @@ public boolean 
reconcileOtherChanges(FlinkResourceContext ctx) throws Except
 LOG.info("Stopping failed Flink job...");
 cleanupAfterFailedJob(ctx);
 status.setError(null);
+ReconciliationUtils.updateStatusForDeployedSpec(
+ctx.getResource(), 
ctx.getDeployConfig(ctx.getResource().getSpec()), clock);

Review Comment:
   Hm I see, I think you are right. I think the more straightforward and 
simpler fix would be to actually fix the `resubmitJob` method. In case where we 
have a savepoint it should set the upgradeMode to savepoint on the spec to 
recover. Otherwise it can leave it stateless. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column [flink-cdc]

2024-07-23 Thread via GitHub


yuxiqian commented on PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#issuecomment-2245470485

   Done, rebased due to some conflicts with 
https://github.com/apache/flink-cdc/commit/26ff6d2a081181f3df7aa49d65d804c57c634122.
 Will add `CAST ... AS` tests after #3357 got merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


mateczagany commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688221106


##
flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkStateSnapshotState.java:
##
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.api.status;
+
+import org.apache.flink.annotation.Experimental;
+
+/** Describes current snapshot state. */
+@Experimental
+public enum FlinkStateSnapshotState {

Review Comment:
   Good idea, thank you!
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35285) Autoscaler key group optimization can interfere with scale-down.max-factor

2024-07-23 Thread Trystan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868093#comment-17868093
 ] 

Trystan commented on FLINK-35285:
-

 
{noformat}
As long as your job parallelism is very small compared to the max parallelism 
and we have a lot of divisors the algorithm has a lot of flexibility even with 
small scale factors. {noformat}
 

 

Yes, I agree this makes sense. Pairing it with vertex max and a high overall 
max-parallelism could essentially trick the current algo into working.

 

I would argue that a current parallelism 40 is not very close to the max 
parallelism of 120, though. Maybe our patterns are outside the norm? But to me 
this seems well within a "normal" range.

 

Is there any reason why we wouldn't want to adjust the algorithm? To my eyes, 
it has a flaw in that when a scale is _requested_ it may not _actually_ scale 
because it does not take into account the current bounds, i.e.
{noformat}
On scale down, ensure that p < currentParallelism and on scale up p > 
currentParallelism.{noformat}
Without such a check, it is very likely that the loop in question will find p 
== currentParallelism and then maxParallelism % p == 0 will return true, 
resulting in no action being taken. Looking at the goals of the algorithm, it 
seems designed to _try its best_ to find a p such that [max % p == 
0|https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296-L303],
 but if it fails it should still return p 
([here|[https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296-L303]).]
 I think a simple check ensuring that p != currentParallelism could let it 
optimize without deadlocking.

 

Or perhaps I'm misunderstanding the goal. I would be happy to send a PR over 
with a slightly tweaked algorithm if you're open to adjusting this slightly.

> Autoscaler key group optimization can interfere with scale-down.max-factor
> --
>
> Key: FLINK-35285
> URL: https://issues.apache.org/jira/browse/FLINK-35285
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Trystan
>Priority: Minor
>
> When setting a less aggressive scale down limit, the key group optimization 
> can prevent a vertex from scaling down at all. It will hunt from target 
> upwards to maxParallelism/2, and will always find currentParallelism again.
>  
> A simple test trying to scale down from a parallelism of 60 with a 
> scale-down.max-factor of 0.2:
> {code:java}
> assertEquals(48, JobVertexScaler.scale(60, inputShipStrategies, 360, .8, 8, 
> 360)); {code}
>  
> It seems reasonable to make a good attempt to spread data across subtasks, 
> but not at the expense of total deadlock. The problem is that during scale 
> down it doesn't actually ensure that newParallelism will be < 
> currentParallelism. The only workaround is to set a scale down factor large 
> enough such that it finds the next lowest divisor of the maxParallelism.
>  
> Clunky, but something to ensure it can make at least some progress. There is 
> another test that now fails, but just to illustrate the point:
> {code:java}
> for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) 
> {
> if ((scaleFactor < 1 && p < currentParallelism) || (scaleFactor > 1 && p 
> > currentParallelism)) {
> if (maxParallelism % p == 0) {
> return p;
> }
> }
> } {code}
>  
> Perhaps this is by design and not a bug, but total failure to scale down in 
> order to keep optimized key groups does not seem ideal.
>  
> Key group optimization block:
> [https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296C1-L303C10]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35285) Autoscaler key group optimization can interfere with scale-down.max-factor

2024-07-23 Thread Trystan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868093#comment-17868093
 ] 

Trystan edited comment on FLINK-35285 at 7/23/24 3:00 PM:
--

{noformat}
As long as your job parallelism is very small compared to the max parallelism 
and we have a lot of divisors the algorithm has a lot of flexibility even with 
small scale factors. {noformat}
Yes, I agree this makes sense. Pairing it with vertex max and a high overall 
max-parallelism could essentially trick the current algo into working.

I would argue that a current parallelism 40 is not very close to the max 
parallelism of 120, though. Maybe our patterns are outside the norm? But to me 
this seems well within a "normal" range.

 

Is there any reason why we wouldn't want to adjust the algorithm? To my eyes, 
it has a flaw in that when a scale is _requested_ it may not _actually_ scale 
because it does not take into account the current bounds, i.e.
{noformat}
On scale down, ensure that p < currentParallelism and on scale up p > 
currentParallelism.{noformat}
Without such a check, it is very likely that the loop in question will find p 
== currentParallelism and then maxParallelism % p == 0 will return true, 
resulting in no action being taken. Looking at the goals of the algorithm, it 
seems designed to _try its best_ to find a p such that [max % p == 
0|https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296-L303],
 but if it fails it should still return p ([here|#L296-L303]).]) I think a 
simple check ensuring that p != currentParallelism could let it optimize 
without deadlocking.

 

Or perhaps I'm misunderstanding the goal. I would be happy to send a PR over 
with a slightly tweaked algorithm if you're open to adjusting this slightly.


was (Author: trystan):
 
{noformat}
As long as your job parallelism is very small compared to the max parallelism 
and we have a lot of divisors the algorithm has a lot of flexibility even with 
small scale factors. {noformat}
 

 

Yes, I agree this makes sense. Pairing it with vertex max and a high overall 
max-parallelism could essentially trick the current algo into working.

 

I would argue that a current parallelism 40 is not very close to the max 
parallelism of 120, though. Maybe our patterns are outside the norm? But to me 
this seems well within a "normal" range.

 

Is there any reason why we wouldn't want to adjust the algorithm? To my eyes, 
it has a flaw in that when a scale is _requested_ it may not _actually_ scale 
because it does not take into account the current bounds, i.e.
{noformat}
On scale down, ensure that p < currentParallelism and on scale up p > 
currentParallelism.{noformat}
Without such a check, it is very likely that the loop in question will find p 
== currentParallelism and then maxParallelism % p == 0 will return true, 
resulting in no action being taken. Looking at the goals of the algorithm, it 
seems designed to _try its best_ to find a p such that [max % p == 
0|https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296-L303],
 but if it fails it should still return p 
([here|[https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296-L303]).]
 I think a simple check ensuring that p != currentParallelism could let it 
optimize without deadlocking.

 

Or perhaps I'm misunderstanding the goal. I would be happy to send a PR over 
with a slightly tweaked algorithm if you're open to adjusting this slightly.

> Autoscaler key group optimization can interfere with scale-down.max-factor
> --
>
> Key: FLINK-35285
> URL: https://issues.apache.org/jira/browse/FLINK-35285
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Trystan
>Priority: Minor
>
> When setting a less aggressive scale down limit, the key group optimization 
> can prevent a vertex from scaling down at all. It will hunt from target 
> upwards to maxParallelism/2, and will always find currentParallelism again.
>  
> A simple test trying to scale down from a parallelism of 60 with a 
> scale-down.max-factor of 0.2:
> {code:java}
> assertEquals(48, JobVertexScaler.scale(60, inputShipStrategies, 360, .8, 8, 
> 360)); {code}
>  
> It seems reasonable to make a good attempt to spread data across subtasks, 
> but not at the expense of total deadlock. The problem is that during scale 
> down it doesn't actually ensure that newPar

[PR] [FLINK-22748][connector-kafka] Allow dynamic target topic selection in SQL Kafka sinks [flink-connector-kafka]

2024-07-23 Thread via GitHub


klam-shop opened a new pull request, #109:
URL: https://github.com/apache/flink-connector-kafka/pull/109

   ## What is the purpose of the change
   Allows writing to different Kafka topics based on the `topic` metadata 
column value in SQL, and updates the Table API's `KafkaDynamicSink` to accept a 
`List topics` instead of `String topic`. The list acts as a whitelist 
of acceptable values for the `topic` metadata column: 
   
   - If a single topic is provided, it is used by default for the target topic 
to produce to
   - If a list is provided, only that list of topics can be produced to
   - If no list is provided, any value is accepted
   
   Builds on the work in https://github.com/apache/flink/pull/16142 by 
@SteNicholas 
   
   ## Brief change log
   - Adds `topic` as writable metadata
   - Updates Table API Kafka sink to accept `List topics` instead of 
`String topic`, mirroring he source side.
   - Implements whitelist behaviour in `DynamicKafkaRecordSerializationSchema`
   
   ## Verifying this change
   - [x] Tests that the Sink Factory and related machinery works as expected
   - [x] Tests the various valid and invalid scenarios for `topic` metadata in 
`DynamicKafkaRecordSerializationSchema`
   
   ## Does this pull request potentially affect one of the following parts:
   
   - Dependencies (does it add or upgrade a dependency): no
   - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
   - The serializers: yes
   - The runtime per-record code paths (performance sensitive): yes
   - Anything that affects deployment or recovery: no
   - The S3 file system connector: no
   
   ## Documentation
   
   - Does this pull request introduce a new feature? yes
   - If yes, how is the feature documented? updated documentation 
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-22748][connector-kafka] Allow dynamic target topic selection in SQL Kafka sinks [flink]

2024-07-23 Thread via GitHub


klam-shop commented on PR #16142:
URL: https://github.com/apache/flink/pull/16142#issuecomment-2245500551

   Hi I've taken some time to build upon [Nicholas 
Jiang](https://issues.apache.org/jira/secure/ViewProfile.jspa?name=nicholasjiang)
 's PR and port it to the new kafka connector repo:
   
   https://github.com/apache/flink-connector-kafka/pull/109
   
   Would appreciate feedback on this approach, let me know.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-35285) Autoscaler key group optimization can interfere with scale-down.max-factor

2024-07-23 Thread Trystan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868093#comment-17868093
 ] 

Trystan edited comment on FLINK-35285 at 7/23/24 3:02 PM:
--

{noformat}
As long as your job parallelism is very small compared to the max parallelism 
and we have a lot of divisors the algorithm has a lot of flexibility even with 
small scale factors. {noformat}
Yes, I agree this makes sense. Pairing it with vertex max and a high overall 
max-parallelism could essentially trick the current algo into working.

I would argue that a current parallelism 40 is not very close to the max 
parallelism of 120, though. Maybe our patterns are outside the norm? But to me 
this seems well within a "normal" range.

 

Is there any reason why we wouldn't want to adjust the algorithm? To my eyes, 
it has a flaw in that when a scale is _requested_ it may not _actually_ scale 
because it does not take into account the current bounds, i.e.
{noformat}
On scale down, ensure that p < currentParallelism and on scale up p > 
currentParallelism.{noformat}
Without such a check, it is very likely that the loop in question will find p 
== currentParallelism and then maxParallelism % p == 0 will return true, 
resulting in no action being taken. Looking at the goals of the algorithm, it 
seems designed to _try its best_ to find a p such that [max % p == 
0|https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296-L303],
 but if it fails it should still return p ([here|#L296-L303]). I think a simple 
check ensuring that p != currentParallelism could let it optimize without 
deadlocking.

 

Or perhaps I'm misunderstanding the goal. I would be happy to send a PR over 
with a slightly tweaked algorithm if you're open to adjusting this slightly.


was (Author: trystan):
{noformat}
As long as your job parallelism is very small compared to the max parallelism 
and we have a lot of divisors the algorithm has a lot of flexibility even with 
small scale factors. {noformat}
Yes, I agree this makes sense. Pairing it with vertex max and a high overall 
max-parallelism could essentially trick the current algo into working.

I would argue that a current parallelism 40 is not very close to the max 
parallelism of 120, though. Maybe our patterns are outside the norm? But to me 
this seems well within a "normal" range.

 

Is there any reason why we wouldn't want to adjust the algorithm? To my eyes, 
it has a flaw in that when a scale is _requested_ it may not _actually_ scale 
because it does not take into account the current bounds, i.e.
{noformat}
On scale down, ensure that p < currentParallelism and on scale up p > 
currentParallelism.{noformat}
Without such a check, it is very likely that the loop in question will find p 
== currentParallelism and then maxParallelism % p == 0 will return true, 
resulting in no action being taken. Looking at the goals of the algorithm, it 
seems designed to _try its best_ to find a p such that [max % p == 
0|https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296-L303],
 but if it fails it should still return p ([here|#L296-L303]).]) I think a 
simple check ensuring that p != currentParallelism could let it optimize 
without deadlocking.

 

Or perhaps I'm misunderstanding the goal. I would be happy to send a PR over 
with a slightly tweaked algorithm if you're open to adjusting this slightly.

> Autoscaler key group optimization can interfere with scale-down.max-factor
> --
>
> Key: FLINK-35285
> URL: https://issues.apache.org/jira/browse/FLINK-35285
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Trystan
>Priority: Minor
>
> When setting a less aggressive scale down limit, the key group optimization 
> can prevent a vertex from scaling down at all. It will hunt from target 
> upwards to maxParallelism/2, and will always find currentParallelism again.
>  
> A simple test trying to scale down from a parallelism of 60 with a 
> scale-down.max-factor of 0.2:
> {code:java}
> assertEquals(48, JobVertexScaler.scale(60, inputShipStrategies, 360, .8, 8, 
> 360)); {code}
>  
> It seems reasonable to make a good attempt to spread data across subtasks, 
> but not at the expense of total deadlock. The problem is that during scale 
> down it doesn't actually ensure that newParallelism will be < 
> currentParallelism. The only workaround is to set a scale down factor large 
> enough such that it finds the next lowest divisor of the maxParallelism.
>  
> Clunky, but

[jira] [Comment Edited] (FLINK-35285) Autoscaler key group optimization can interfere with scale-down.max-factor

2024-07-23 Thread Trystan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868093#comment-17868093
 ] 

Trystan edited comment on FLINK-35285 at 7/23/24 3:03 PM:
--

{noformat}
As long as your job parallelism is very small compared to the max parallelism 
and we have a lot of divisors the algorithm has a lot of flexibility even with 
small scale factors. {noformat}
Yes, I agree this makes sense. Pairing it with vertex max and a high overall 
max-parallelism could essentially trick the current algo into working.

I would argue that a current parallelism 40 is not very close to the max 
parallelism of 120, though. Maybe our patterns are outside the norm? But to me 
this seems well within a "normal" range.

 

Is there any reason why we wouldn't want to adjust the algorithm? To my eyes, 
it has a flaw in that when a scale is _requested_ it may not _actually_ scale 
because it does not take into account the current bounds, i.e.
{noformat}
On scale down, ensure that p < currentParallelism and on scale up p > 
currentParallelism.{noformat}
Without such a check, it is very likely that the loop in question will find p 
== currentParallelism and then maxParallelism % p == 0 will return true, 
resulting in no action being taken. Looking at the goals of the algorithm, it 
seems designed to _try its best_ to find a p such that [max % p == 
0|https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296-L303],
 but if it fails it should still return p 
([here|https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L305-L306]).
 I think a simple check ensuring that p != currentParallelism could let it 
optimize without deadlocking.

 

Or perhaps I'm misunderstanding the goal. I would be happy to send a PR over 
with a slightly tweaked algorithm if you're open to adjusting this slightly.


was (Author: trystan):
{noformat}
As long as your job parallelism is very small compared to the max parallelism 
and we have a lot of divisors the algorithm has a lot of flexibility even with 
small scale factors. {noformat}
Yes, I agree this makes sense. Pairing it with vertex max and a high overall 
max-parallelism could essentially trick the current algo into working.

I would argue that a current parallelism 40 is not very close to the max 
parallelism of 120, though. Maybe our patterns are outside the norm? But to me 
this seems well within a "normal" range.

 

Is there any reason why we wouldn't want to adjust the algorithm? To my eyes, 
it has a flaw in that when a scale is _requested_ it may not _actually_ scale 
because it does not take into account the current bounds, i.e.
{noformat}
On scale down, ensure that p < currentParallelism and on scale up p > 
currentParallelism.{noformat}
Without such a check, it is very likely that the loop in question will find p 
== currentParallelism and then maxParallelism % p == 0 will return true, 
resulting in no action being taken. Looking at the goals of the algorithm, it 
seems designed to _try its best_ to find a p such that [max % p == 
0|https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296-L303],
 but if it fails it should still return p ([here|#L296-L303]). I think a simple 
check ensuring that p != currentParallelism could let it optimize without 
deadlocking.

 

Or perhaps I'm misunderstanding the goal. I would be happy to send a PR over 
with a slightly tweaked algorithm if you're open to adjusting this slightly.

> Autoscaler key group optimization can interfere with scale-down.max-factor
> --
>
> Key: FLINK-35285
> URL: https://issues.apache.org/jira/browse/FLINK-35285
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Trystan
>Priority: Minor
>
> When setting a less aggressive scale down limit, the key group optimization 
> can prevent a vertex from scaling down at all. It will hunt from target 
> upwards to maxParallelism/2, and will always find currentParallelism again.
>  
> A simple test trying to scale down from a parallelism of 60 with a 
> scale-down.max-factor of 0.2:
> {code:java}
> assertEquals(48, JobVertexScaler.scale(60, inputShipStrategies, 360, .8, 8, 
> 360)); {code}
>  
> It seems reasonable to make a good attempt to spread data across subtasks, 
> but not at the expense of total deadlock. The problem is that during scale 
> down it doesn't actually ensure that newParallelism will b

[jira] [Commented] (FLINK-22748) Allow dynamic target topic selection in SQL Kafka sinks

2024-07-23 Thread Kevin Lam (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868094#comment-17868094
 ] 

Kevin Lam commented on FLINK-22748:
---

Hi I've taken some time to build upon [~nicholasjiang] 's PR and port it to the 
new kafka connector repo:

[https://github.com/apache/flink-connector-kafka/pull/109]

Would appreciate feedback on this approach, let me know.

> Allow dynamic target topic selection in SQL Kafka sinks
> ---
>
> Key: FLINK-22748
> URL: https://issues.apache.org/jira/browse/FLINK-22748
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Table SQL / Ecosystem
>Reporter: Timo Walther
>Assignee: Nicholas Jiang
>Priority: Major
>  Labels: pull-request-available, stale-assigned, starter
>
> We should allow to write to different Kafka topics based on some column value 
> in SQL.
> The existing implementation can be easily adapted for that. The "target 
> topic" would be an additional persisted metadata column in SQL terms. All one 
> need to do is to adapt
> DynamicKafkaSerializationSchema
> KafkaDynamicSink
> We should guard this dynamic behavior via a config option and make the topic 
> option optional in this case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35285) Autoscaler key group optimization can interfere with scale-down.max-factor

2024-07-23 Thread Trystan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868093#comment-17868093
 ] 

Trystan edited comment on FLINK-35285 at 7/23/24 3:03 PM:
--

{noformat}
As long as your job parallelism is very small compared to the max parallelism 
and we have a lot of divisors the algorithm has a lot of flexibility even with 
small scale factors. {noformat}
Yes, I agree this makes sense. Pairing it with vertex max and a high overall 
max-parallelism could essentially trick the current algo into working.

I would argue that a current parallelism 40 is not very close to the max 
parallelism of 120, though. Maybe our patterns are outside the norm? But to me 
this seems well within a "normal" range.

 

Is there any reason why we wouldn't want to adjust the algorithm? To my eyes, 
it has a flaw in that when a scale is _requested_ it may not _actually_ scale 
because it does not take into account the current bounds, i.e.
{noformat}
On scale down, ensure that p < currentParallelism and on scale up p > 
currentParallelism.{noformat}
Without such a check, it is very likely that the loop in question will find p 
== currentParallelism and then maxParallelism % p == 0 will return true, 
resulting in no action being taken. Looking at the goals of the algorithm, it 
seems designed to _try its best_ to find a p such that [max % p == 
0|https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296-L303],
 but if it fails it should still return p 
([here|https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L305-L306]).
 I think a simple check ensuring that p != currentParallelism in the keygroup 
optimization loop could let it optimize without deadlocking.

 

Or perhaps I'm misunderstanding the goal. I would be happy to send a PR over 
with a slightly tweaked algorithm if you're open to adjusting this slightly.


was (Author: trystan):
{noformat}
As long as your job parallelism is very small compared to the max parallelism 
and we have a lot of divisors the algorithm has a lot of flexibility even with 
small scale factors. {noformat}
Yes, I agree this makes sense. Pairing it with vertex max and a high overall 
max-parallelism could essentially trick the current algo into working.

I would argue that a current parallelism 40 is not very close to the max 
parallelism of 120, though. Maybe our patterns are outside the norm? But to me 
this seems well within a "normal" range.

 

Is there any reason why we wouldn't want to adjust the algorithm? To my eyes, 
it has a flaw in that when a scale is _requested_ it may not _actually_ scale 
because it does not take into account the current bounds, i.e.
{noformat}
On scale down, ensure that p < currentParallelism and on scale up p > 
currentParallelism.{noformat}
Without such a check, it is very likely that the loop in question will find p 
== currentParallelism and then maxParallelism % p == 0 will return true, 
resulting in no action being taken. Looking at the goals of the algorithm, it 
seems designed to _try its best_ to find a p such that [max % p == 
0|https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296-L303],
 but if it fails it should still return p 
([here|https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L305-L306]).
 I think a simple check ensuring that p != currentParallelism could let it 
optimize without deadlocking.

 

Or perhaps I'm misunderstanding the goal. I would be happy to send a PR over 
with a slightly tweaked algorithm if you're open to adjusting this slightly.

> Autoscaler key group optimization can interfere with scale-down.max-factor
> --
>
> Key: FLINK-35285
> URL: https://issues.apache.org/jira/browse/FLINK-35285
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Trystan
>Priority: Minor
>
> When setting a less aggressive scale down limit, the key group optimization 
> can prevent a vertex from scaling down at all. It will hunt from target 
> upwards to maxParallelism/2, and will always find currentParallelism again.
>  
> A simple test trying to scale down from a parallelism of 60 with a 
> scale-down.max-factor of 0.2:
> {code:java}
> assertEquals(48, JobVertexScaler.scale(60, inputShipStrategies, 360, .8, 8, 
> 360)); {code}
>  
> It

Re: [PR] [FLINK-22748][connector-kafka] Allow dynamic target topic selection in SQL Kafka sinks [flink-connector-kafka]

2024-07-23 Thread via GitHub


klam-shop commented on code in PR #109:
URL: 
https://github.com/apache/flink-connector-kafka/pull/109#discussion_r1688236979


##
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java:
##
@@ -144,14 +147,34 @@ public void open(
 valueSerialization.open(context);
 }
 
+private String getTargetTopic(RowData element) {
+final String topic = readMetadata(element, 
KafkaDynamicSink.WritableMetadata.TOPIC);
+if (topic == null && topics == null) {
+throw new IllegalArgumentException(
+"The topic of the sink record is not valid. Expected a 
single topic but no topic is set.");
+} else if (topic == null && topics.size() == 1) {
+return topics.get(0);
+} else if (topics != null && topics.size() > 0 && 
!topics.contains(topic)) {

Review Comment:
   Open to discuss: I decided to keep topics as a List since, but we can change 
it to HashSet. I am assuming the list will be short on average.
   
   List can outperform HashSet for small number of elements:
   https://stackoverflow.com/questions/150750/hashset-vs-list-performance



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


mateczagany commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688262282


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotContext.java:
##
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import org.apache.flink.kubernetes.operator.api.spec.JobKind;
+import 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+import java.util.Optional;
+
+/** Context for reconciling a snapshot. */
+@Getter
+@RequiredArgsConstructor
+public class FlinkStateSnapshotContext {
+
+private final FlinkStateSnapshot resource;
+private final FlinkStateSnapshotStatus originalStatus;
+private final Context josdkContext;
+private final FlinkConfigManager configManager;
+
+private FlinkOperatorConfiguration operatorConfig;
+private Configuration referencedJobObserveConfig;
+private FlinkDeployment referencedJobFlinkDeployment;
+
+/**
+ * @return Operator configuration for this resource.
+ */
+public FlinkOperatorConfiguration getOperatorConfig() {
+if (operatorConfig != null) {
+return operatorConfig;
+}
+return operatorConfig =
+configManager.getOperatorConfiguration(
+getResource().getMetadata().getNamespace(), null);

Review Comment:
   Added lazy getter as well, thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35886) Incorrect watermark idleness timeout accounting when subtask is backpressured/blocked

2024-07-23 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-35886:
--

 Summary: Incorrect watermark idleness timeout accounting when 
subtask is backpressured/blocked
 Key: FLINK-35886
 URL: https://issues.apache.org/jira/browse/FLINK-35886
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream, Runtime / Task
Affects Versions: 1.19.1, 1.18.1, 1.20.0
Reporter: Piotr Nowojski


Currently when using watermark with idleness in Flink, idleness can be 
incorrectly detected when reading records from a source that is blocked by the 
runtime. For example this can easily happen when source is either 
backpressured, or blocked by the watermark alignment. In those cases, despite 
there are more records to be read from the source (or source’s split), runtime 
is deciding not to poll (or being unable to) those records. In such case 
idleness timeout can kick in, marking source/source split as idle, which can 
lead to incorrect combined watermark calculations and dropping of incorrectly 
marked late records.

h4. Watermark alignment

If there are two source splits, A and B , and maxAllowedWatermarkDrift is set 
to 30s. 

# Partition A emitted watermark 1042 sec, while partition B sits at watermark 
1000 sec.
# {{1042s - 1000s > maxAllowedWatermarkDrift}}, so partition A is blocked by 
the watermark alignment.
# For the duration of idleTimeout, partition B is emitting some large batch of 
records, that do not advance watermark of that partition by much. For example 
either watermark for partition B stays 1000s, or is updated by a small amount 
to for example 1005s.
# idleTimeout kicks in, marking partition A as idle
# partition B finishes emitting large batch of those older records, and let's 
say now there is a gap in rowtimes. Previously partition B was emitting records 
with rowtime ~1000s, now it jumps to for example ~5000s.
# As partition A is idle, combined watermark jumps to ~5000s as well.
# Watermark alignment unblocks partition A, and it continues emitting records 
with rowtime ~1042s. But now all of those records are dropped due to being late.

h4. Backpressure

When there are two SourceOperator’s, A and B. Due to for example some data 
skew, it could happen that either only A gets backpressured, or A is 
backpressured quicker/sooner. Either way, during that time when A is 
backpressured, while B is not, B can bump the combined watermark high enough, 
so that when backpressure recedes, fresh records from A will be considered as 
late, leading to incorrect results.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35886) Incorrect watermark idleness timeout accounting when subtask is backpressured/blocked

2024-07-23 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-35886:
--

Assignee: Piotr Nowojski

> Incorrect watermark idleness timeout accounting when subtask is 
> backpressured/blocked
> -
>
> Key: FLINK-35886
> URL: https://issues.apache.org/jira/browse/FLINK-35886
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Runtime / Task
>Affects Versions: 1.18.1, 1.20.0, 1.19.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Critical
>
> Currently when using watermark with idleness in Flink, idleness can be 
> incorrectly detected when reading records from a source that is blocked by 
> the runtime. For example this can easily happen when source is either 
> backpressured, or blocked by the watermark alignment. In those cases, despite 
> there are more records to be read from the source (or source’s split), 
> runtime is deciding not to poll (or being unable to) those records. In such 
> case idleness timeout can kick in, marking source/source split as idle, which 
> can lead to incorrect combined watermark calculations and dropping of 
> incorrectly marked late records.
> h4. Watermark alignment
> If there are two source splits, A and B , and maxAllowedWatermarkDrift is set 
> to 30s. 
> # Partition A emitted watermark 1042 sec, while partition B sits at watermark 
> 1000 sec.
> # {{1042s - 1000s > maxAllowedWatermarkDrift}}, so partition A is blocked by 
> the watermark alignment.
> # For the duration of idleTimeout, partition B is emitting some large batch 
> of records, that do not advance watermark of that partition by much. For 
> example either watermark for partition B stays 1000s, or is updated by a 
> small amount to for example 1005s.
> # idleTimeout kicks in, marking partition A as idle
> # partition B finishes emitting large batch of those older records, and let's 
> say now there is a gap in rowtimes. Previously partition B was emitting 
> records with rowtime ~1000s, now it jumps to for example ~5000s.
> # As partition A is idle, combined watermark jumps to ~5000s as well.
> # Watermark alignment unblocks partition A, and it continues emitting records 
> with rowtime ~1042s. But now all of those records are dropped due to being 
> late.
> h4. Backpressure
> When there are two SourceOperator’s, A and B. Due to for example some data 
> skew, it could happen that either only A gets backpressured, or A is 
> backpressured quicker/sooner. Either way, during that time when A is 
> backpressured, while B is not, B can bump the combined watermark high enough, 
> so that when backpressure recedes, fresh records from A will be considered as 
> late, leading to incorrect results.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35886) Incorrect watermark idleness timeout accounting when subtask is backpressured/blocked

2024-07-23 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868098#comment-17868098
 ] 

Piotr Nowojski commented on FLINK-35886:


I think this problem requires a public API change, so I will publish a FLIP 
shortly.

> Incorrect watermark idleness timeout accounting when subtask is 
> backpressured/blocked
> -
>
> Key: FLINK-35886
> URL: https://issues.apache.org/jira/browse/FLINK-35886
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Runtime / Task
>Affects Versions: 1.18.1, 1.20.0, 1.19.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Critical
>
> Currently when using watermark with idleness in Flink, idleness can be 
> incorrectly detected when reading records from a source that is blocked by 
> the runtime. For example this can easily happen when source is either 
> backpressured, or blocked by the watermark alignment. In those cases, despite 
> there are more records to be read from the source (or source’s split), 
> runtime is deciding not to poll (or being unable to) those records. In such 
> case idleness timeout can kick in, marking source/source split as idle, which 
> can lead to incorrect combined watermark calculations and dropping of 
> incorrectly marked late records.
> h4. Watermark alignment
> If there are two source splits, A and B , and maxAllowedWatermarkDrift is set 
> to 30s. 
> # Partition A emitted watermark 1042 sec, while partition B sits at watermark 
> 1000 sec.
> # {{1042s - 1000s > maxAllowedWatermarkDrift}}, so partition A is blocked by 
> the watermark alignment.
> # For the duration of idleTimeout, partition B is emitting some large batch 
> of records, that do not advance watermark of that partition by much. For 
> example either watermark for partition B stays 1000s, or is updated by a 
> small amount to for example 1005s.
> # idleTimeout kicks in, marking partition A as idle
> # partition B finishes emitting large batch of those older records, and let's 
> say now there is a gap in rowtimes. Previously partition B was emitting 
> records with rowtime ~1000s, now it jumps to for example ~5000s.
> # As partition A is idle, combined watermark jumps to ~5000s as well.
> # Watermark alignment unblocks partition A, and it continues emitting records 
> with rowtime ~1042s. But now all of those records are dropped due to being 
> late.
> h4. Backpressure
> When there are two SourceOperator’s, A and B. Due to for example some data 
> skew, it could happen that either only A gets backpressured, or A is 
> backpressured quicker/sooner. Either way, during that time when A is 
> backpressured, while B is not, B can bump the combined watermark high enough, 
> so that when backpressure recedes, fresh records from A will be considered as 
> late, leading to incorrect results.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column [flink-cdc]

2024-07-23 Thread via GitHub


yuxiqian commented on code in PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1688271544


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java:
##


Review Comment:
   I modified expression evaluation logic here since previous version could not 
correctly handle expressions like `filter: a > 1 and a < 2` and generates 
duplicated argument names which would crash Janino compiler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column [flink-cdc]

2024-07-23 Thread via GitHub


yuxiqian commented on code in PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1688271544


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java:
##


Review Comment:
   I modified expression evaluation logic here since previous version could not 
correctly handle expressions like `filter: a > 1 and a < 2` and generates 
duplicated argument names which would crash the operator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-23 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1688281873


##
flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilderTest.java:
##
@@ -25,14 +25,6 @@
 /** Covers construction, defaults and sanity checking of {@link 
SqsSinkBuilder}. */
 class SqsSinkBuilderTest {
 
-@Test
-void elementConverterOfSinkMustBeSetWhenBuilt() {
-Assertions.assertThatExceptionOfType(NullPointerException.class)
-.isThrownBy(() -> 
SqsSink.builder().setSqsUrl("sqlUrl").build())
-.withMessageContaining(
-"No SerializationSchema was supplied to the SQS Sink 
builder.");
-}
-

Review Comment:
   because we added a default SerializationSchema if it is not provided by the 
customer so we never end up throwing this error



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-35886) Incorrect watermark idleness timeout accounting when subtask is backpressured/blocked

2024-07-23 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868098#comment-17868098
 ] 

Piotr Nowojski edited comment on FLINK-35886 at 7/23/24 3:45 PM:
-

I think this problem requires a public API change, so I have published a FLIP 
for this: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-471%3A++Fixing+watermark+idleness+timeout+accounting


was (Author: pnowojski):
I think this problem requires a public API change, so I will publish a FLIP 
shortly.

> Incorrect watermark idleness timeout accounting when subtask is 
> backpressured/blocked
> -
>
> Key: FLINK-35886
> URL: https://issues.apache.org/jira/browse/FLINK-35886
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Runtime / Task
>Affects Versions: 1.18.1, 1.20.0, 1.19.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Critical
>
> Currently when using watermark with idleness in Flink, idleness can be 
> incorrectly detected when reading records from a source that is blocked by 
> the runtime. For example this can easily happen when source is either 
> backpressured, or blocked by the watermark alignment. In those cases, despite 
> there are more records to be read from the source (or source’s split), 
> runtime is deciding not to poll (or being unable to) those records. In such 
> case idleness timeout can kick in, marking source/source split as idle, which 
> can lead to incorrect combined watermark calculations and dropping of 
> incorrectly marked late records.
> h4. Watermark alignment
> If there are two source splits, A and B , and maxAllowedWatermarkDrift is set 
> to 30s. 
> # Partition A emitted watermark 1042 sec, while partition B sits at watermark 
> 1000 sec.
> # {{1042s - 1000s > maxAllowedWatermarkDrift}}, so partition A is blocked by 
> the watermark alignment.
> # For the duration of idleTimeout, partition B is emitting some large batch 
> of records, that do not advance watermark of that partition by much. For 
> example either watermark for partition B stays 1000s, or is updated by a 
> small amount to for example 1005s.
> # idleTimeout kicks in, marking partition A as idle
> # partition B finishes emitting large batch of those older records, and let's 
> say now there is a gap in rowtimes. Previously partition B was emitting 
> records with rowtime ~1000s, now it jumps to for example ~5000s.
> # As partition A is idle, combined watermark jumps to ~5000s as well.
> # Watermark alignment unblocks partition A, and it continues emitting records 
> with rowtime ~1042s. But now all of those records are dropped due to being 
> late.
> h4. Backpressure
> When there are two SourceOperator’s, A and B. Due to for example some data 
> skew, it could happen that either only A gets backpressured, or A is 
> backpressured quicker/sooner. Either way, during that time when A is 
> backpressured, while B is not, B can bump the combined watermark high enough, 
> so that when backpressure recedes, fresh records from A will be considered as 
> late, leading to incorrect results.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35887) Null Pointer Exception in TypeExtractor.isRecord when trying to provide type info for interface

2024-07-23 Thread Jacob Jona Fahlenkamp (Jira)
Jacob Jona Fahlenkamp created FLINK-35887:
-

 Summary: Null Pointer Exception in TypeExtractor.isRecord when 
trying to provide type info for interface
 Key: FLINK-35887
 URL: https://issues.apache.org/jira/browse/FLINK-35887
 Project: Flink
  Issue Type: Bug
  Components: API / Type Serialization System
Affects Versions: 1.19.1
Reporter: Jacob Jona Fahlenkamp


The following code
{code:java}
import org.apache.flink.api.common.typeinfo.TypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.types.PojoTestUtils;
import org.junit.jupiter.api.Test;

import java.lang.reflect.Type;
import java.util.Map;

public class DebugTest {
@TypeInfo(FooFactory.class)
public interface Foo{}

public static class FooFactory extends TypeInfoFactory {
   @Override
   public TypeInformation createTypeInfo(Type type, Map> map) {
  return Types.POJO(Foo.class, Map.of());
   }
}

@Test
void test() {
   PojoTestUtils.assertSerializedAsPojo(Foo.class);
}
} {code}
throws this exception:
{code:java}
java.lang.NullPointerException: Cannot invoke "java.lang.Class.getName()" 
because the return value of "java.lang.Class.getSuperclass()" is null
at 
org.apache.flink.api.java.typeutils.TypeExtractor.isRecord(TypeExtractor.java:2227)
  at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.(PojoSerializer.java:125)
   at 
org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:359)
 at 
org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:347)
 at 
org.apache.flink.types.PojoTestUtils.assertSerializedAsPojo(PojoTestUtils.java:48)
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35887) Null Pointer Exception in TypeExtractor.isRecord when trying to provide type info for interface

2024-07-23 Thread Jacob Jona Fahlenkamp (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Jona Fahlenkamp updated FLINK-35887:
--
Priority: Major  (was: Minor)

> Null Pointer Exception in TypeExtractor.isRecord when trying to provide type 
> info for interface
> ---
>
> Key: FLINK-35887
> URL: https://issues.apache.org/jira/browse/FLINK-35887
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.19.1
>Reporter: Jacob Jona Fahlenkamp
>Priority: Major
>
> The following code
> {code:java}
> import org.apache.flink.api.common.typeinfo.TypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.types.PojoTestUtils;
> import org.junit.jupiter.api.Test;
> import java.lang.reflect.Type;
> import java.util.Map;
> public class DebugTest {
> @TypeInfo(FooFactory.class)
> public interface Foo{}
> public static class FooFactory extends TypeInfoFactory {
>@Override
>public TypeInformation createTypeInfo(Type type, Map TypeInformation> map) {
>   return Types.POJO(Foo.class, Map.of());
>}
> }
> @Test
> void test() {
>PojoTestUtils.assertSerializedAsPojo(Foo.class);
> }
> } {code}
> throws this exception:
> {code:java}
> java.lang.NullPointerException: Cannot invoke "java.lang.Class.getName()" 
> because the return value of "java.lang.Class.getSuperclass()" is null
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.isRecord(TypeExtractor.java:2227)
>   at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.(PojoSerializer.java:125)
>at 
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:359)
>  at 
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:347)
>  at 
> org.apache.flink.types.PojoTestUtils.assertSerializedAsPojo(PojoTestUtils.java:48)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


mateczagany commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688463075


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/AuditUtils.java:
##
@@ -53,6 +62,16 @@ private static String format(@NonNull CommonStatus 
status) {
 : status.getError());
 }
 
+private static String format(@NonNull FlinkStateSnapshotStatus status) {
+if (StringUtils.isEmpty(status.getError())) {
+return String.format(
+">>> Status[Snapshot] | Info | %s | %s", 
status.getState(), status.getPath());

Review Comment:
   I have added some changes for this, thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


mateczagany commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688472333


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##
@@ -716,6 +695,48 @@ public CheckpointFetchResult fetchCheckpointInfo(
 }
 }
 
+@Override
+public CheckpointStatsResult fetchCheckpointStats(
+String jobId, Long checkpointId, Configuration conf) {
+try (RestClusterClient clusterClient = getClusterClient(conf)) 
{
+var checkpointStatusHeaders = 
CheckpointStatisticDetailsHeaders.getInstance();
+var parameters = 
checkpointStatusHeaders.getUnresolvedMessageParameters();
+parameters.jobPathParameter.resolve(JobID.fromHexString(jobId));
+
+// This was needed because the parameter is protected
+var checkpointIdPathParameter =
+(CheckpointIdPathParameter) 
Iterables.getLast(parameters.getPathParameters());
+checkpointIdPathParameter.resolve(checkpointId);
+
+var response =
+clusterClient.sendRequest(
+checkpointStatusHeaders, parameters, 
EmptyRequestBody.getInstance());
+
+var stats = response.get();
+if (stats == null) {
+throw new IllegalStateException("Checkpoint ID %d for job %s 
does not exist!");
+} else if (stats instanceof 
CheckpointStatistics.CompletedCheckpointStatistics) {
+return CheckpointStatsResult.completed(
+((CheckpointStatistics.CompletedCheckpointStatistics) 
stats)
+.getExternalPath());
+} else if (stats instanceof 
CheckpointStatistics.FailedCheckpointStatistics) {
+return CheckpointStatsResult.error(
+((CheckpointStatistics.FailedCheckpointStatistics) 
stats)
+.getFailureMessage());
+} else if (stats instanceof 
CheckpointStatistics.PendingCheckpointStatistics) {
+return CheckpointStatsResult.pending();
+} else {
+throw new IllegalArgumentException(
+String.format(
+"Unknown checkpoint statistics result class: 
%s",
+stats.getClass().getSimpleName()));
+}
+} catch (Exception e) {
+LOG.error("Exception while fetching checkpoint statistics", e);

Review Comment:
   We should handle cases where the checkpoint statistics are no longer stored 
on the web server, and we get the following error: 
   ```
   Could not find checkpointing statistics for checkpoint 243.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35285) Autoscaler key group optimization can interfere with scale-down.max-factor

2024-07-23 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868168#comment-17868168
 ] 

Gyula Fora commented on FLINK-35285:


{noformat}
I would argue that a current parallelism 40 is not very close to the max 
parallelism of 120, though. Maybe our patterns are outside the norm? But to me 
this seems well within a "normal" range.{noformat}
My production recommendation is to start with 720 as the max parallelism for 
anything but the smallest jobs. Your parallelism setting is definitely in the 
normal range but I think your max parallelism is not ideal given how key 
distribution works for Flink especially if you want to use the autoscaler.

I don't really understand your proposal, let's say the ideal parallelism 
computed using the scale factor would be X, the autoscaler can decide to scale 
to anything larger than X as that would satisfy the throughput requirement but 
we can never scale to lower than X. This bound is true for both scale ups and 
scale downs. We always have to find the closes to X that is still larger or 
equal to it.

> Autoscaler key group optimization can interfere with scale-down.max-factor
> --
>
> Key: FLINK-35285
> URL: https://issues.apache.org/jira/browse/FLINK-35285
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Trystan
>Priority: Minor
>
> When setting a less aggressive scale down limit, the key group optimization 
> can prevent a vertex from scaling down at all. It will hunt from target 
> upwards to maxParallelism/2, and will always find currentParallelism again.
>  
> A simple test trying to scale down from a parallelism of 60 with a 
> scale-down.max-factor of 0.2:
> {code:java}
> assertEquals(48, JobVertexScaler.scale(60, inputShipStrategies, 360, .8, 8, 
> 360)); {code}
>  
> It seems reasonable to make a good attempt to spread data across subtasks, 
> but not at the expense of total deadlock. The problem is that during scale 
> down it doesn't actually ensure that newParallelism will be < 
> currentParallelism. The only workaround is to set a scale down factor large 
> enough such that it finds the next lowest divisor of the maxParallelism.
>  
> Clunky, but something to ensure it can make at least some progress. There is 
> another test that now fails, but just to illustrate the point:
> {code:java}
> for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) 
> {
> if ((scaleFactor < 1 && p < currentParallelism) || (scaleFactor > 1 && p 
> > currentParallelism)) {
> if (maxParallelism % p == 0) {
> return p;
> }
> }
> } {code}
>  
> Perhaps this is by design and not a bug, but total failure to scale down in 
> order to keep optimized key groups does not seem ideal.
>  
> Key group optimization block:
> [https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296C1-L303C10]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35285) Autoscaler key group optimization can interfere with scale-down.max-factor

2024-07-23 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868169#comment-17868169
 ] 

Gyula Fora commented on FLINK-35285:


It's much better to not scale down than to undershoot the parallelism because 
that will immediately lead to problems.

> Autoscaler key group optimization can interfere with scale-down.max-factor
> --
>
> Key: FLINK-35285
> URL: https://issues.apache.org/jira/browse/FLINK-35285
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Trystan
>Priority: Minor
>
> When setting a less aggressive scale down limit, the key group optimization 
> can prevent a vertex from scaling down at all. It will hunt from target 
> upwards to maxParallelism/2, and will always find currentParallelism again.
>  
> A simple test trying to scale down from a parallelism of 60 with a 
> scale-down.max-factor of 0.2:
> {code:java}
> assertEquals(48, JobVertexScaler.scale(60, inputShipStrategies, 360, .8, 8, 
> 360)); {code}
>  
> It seems reasonable to make a good attempt to spread data across subtasks, 
> but not at the expense of total deadlock. The problem is that during scale 
> down it doesn't actually ensure that newParallelism will be < 
> currentParallelism. The only workaround is to set a scale down factor large 
> enough such that it finds the next lowest divisor of the maxParallelism.
>  
> Clunky, but something to ensure it can make at least some progress. There is 
> another test that now fails, but just to illustrate the point:
> {code:java}
> for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) 
> {
> if ((scaleFactor < 1 && p < currentParallelism) || (scaleFactor > 1 && p 
> > currentParallelism)) {
> if (maxParallelism % p == 0) {
> return p;
> }
> }
> } {code}
>  
> Perhaps this is by design and not a bug, but total failure to scale down in 
> order to keep optimized key groups does not seem ideal.
>  
> Key group optimization block:
> [https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296C1-L303C10]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [DRAFT][FLINK-34440][formats][protobuf-confluent] Protobuf confluent dynamic format [flink]

2024-07-23 Thread via GitHub


dmariassy opened a new pull request, #25114:
URL: https://github.com/apache/flink/pull/25114

   **This is a DRAFT PR**.
   
   ## TODO
   
   - Add missing boilerplate (e.g. config)
   - Add debezium support
   - Test the format in Shopify Flink jobs
   - Docs (might be a separate PR)
   
   ## What is the purpose of the change
   
   - Add support for deserializing protobuf messages using the Confluent wire 
format and whose schemas can be fetched from Confluent Schema Registry
   - Add support for serializing Flink records using the Confluent protobuf 
wire format
   
   ## Brief change log
   
   My intention was to:
   - Maintain parity with the existing flink-protobuf format's semantics in 
terms of the Flink -> Protobuf / Protobuf -> Flink conversions
   - Maximize code reuse between flink-protobuf-confluent and flink-protobuf 
formats
   
   ### Deserializer
   
   - Fetch the message's protobuf descriptor from the Confluent schema registry
   - Generate a java class from the descriptor at runtime
   - Deserialize `byte[]`s to the generated `protobuf.Message` type using a 
`io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer`
   - Delegate the work of converting between a `protobuf.Message` and a 
`RowData` object to the existing flink-protobuf format
   
   ### Serializer
   
   - Convert the user's `RowType` to a protobuf descriptor
   - Generate a java class from the descriptor at runtime
   - Delegate the `RowData` -> `AbstractMessage` conversion to the existing 
flink-protobuf format
   - Serialize the `AbstractMessage` object using a 
`io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer`
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   This change added tests and can be verified as follows:
   
   - Added comprehensive test coverage
   - Will shortly deploy to Shopify Flink clusters 
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): yes 
(com.github.os72:protoc-jar)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: yes
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? docs to follow
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >