[PR] [FLINK-35868][mongo] Add MongoDB 6.0 & 7.0 tests [flink-cdc]
yuxiqian opened a new pull request, #3489: URL: https://github.com/apache/flink-cdc/pull/3489 This closes FLINK-35868. It allows Mongo CDC test cases running on 6.0.16 & 7.0.12 (and legacy 5.0.2). Notice: since JUnit doesn't allow parameterized `@ClassRule` or static fields, so containers must be created / destroyed before / after each test case, which significantly slows down the. testing (~50 min.). More discussion is required on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35868][mongo] Add MongoDB 6.0 & 7.0 tests [flink-cdc]
yuxiqian commented on PR #3489: URL: https://github.com/apache/flink-cdc/pull/3489#issuecomment-2244493624 @leonardBang @Jiabao-Sun PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35868) Bump Mongo driver version to support Mongo 7.0+
[ https://issues.apache.org/jira/browse/FLINK-35868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35868: --- Labels: pull-request-available (was: ) > Bump Mongo driver version to support Mongo 7.0+ > --- > > Key: FLINK-35868 > URL: https://issues.apache.org/jira/browse/FLINK-35868 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: yux >Priority: Major > Labels: pull-request-available > > Currently, MongoDB CDC connector depends on mongodb-driver v4.9.1, which > doesn't support Mongo Server 7.0+[1]. Upgrading dependency version would be > nice since Mongo 7.0 has been released nearly a year ago. > [1] https://www.mongodb.com/docs/drivers/java/sync/current/compatibility/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35876) mysql cdc may change from snapshot mode to binlog mode failed when cpu load of mysql server is 100%
Youle created FLINK-35876: - Summary: mysql cdc may change from snapshot mode to binlog mode failed when cpu load of mysql server is 100% Key: FLINK-35876 URL: https://issues.apache.org/jira/browse/FLINK-35876 Project: Flink Issue Type: Bug Reporter: Youle -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35876) mysql cdc may change from snapshot mode to binlog mode failed when cpu load of mysql server is 100%
[ https://issues.apache.org/jira/browse/FLINK-35876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Youle updated FLINK-35876: -- Attachment: image-2024-07-23-15-47-10-376.png Description: I use mysql cdc to read data from mysql to write to doris, mysql cdc may change from sanpshot mode to binlog mode failed when I set up 2 flink job(20 parallism of source) and the cpu load of mysql server is 100%. It can't change even the cpu load drop and just hang up, won't consume latest binlog data. But when I only set up one flink job(20 parallism of source) and the cpu load of mysql server is less than 100%, that mistake never happens. Is there anything to do to avoid this mistake if we can't promise the cpu load always in a healthy state? !image-2024-07-23-15-47-10-376.png! > mysql cdc may change from snapshot mode to binlog mode failed when cpu load > of mysql server is 100% > --- > > Key: FLINK-35876 > URL: https://issues.apache.org/jira/browse/FLINK-35876 > Project: Flink > Issue Type: Bug >Reporter: Youle >Priority: Major > Attachments: image-2024-07-23-15-47-10-376.png > > > I use mysql cdc to read data from mysql to write to doris, mysql cdc may > change from sanpshot mode to binlog mode failed when I set up 2 flink job(20 > parallism of source) and the cpu load of mysql server is 100%. It can't > change even the cpu load drop and just hang up, won't consume latest binlog > data. But when I only set up one flink job(20 parallism of source) and the > cpu load of mysql server is less than 100%, that mistake never happens. > Is there anything to do to avoid this mistake if we can't promise the cpu > load always in a healthy state? > !image-2024-07-23-15-47-10-376.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35876) mysql cdc may change from snapshot mode to binlog mode failed when cpu load of mysql server is 100%
[ https://issues.apache.org/jira/browse/FLINK-35876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Youle updated FLINK-35876: -- Description: I use mysql cdc to read data from mysql to write to doris, mysql cdc may change from sanpshot mode to binlog mode failed when I set up 2 flink job(20 parallism of source) and the cpu load of mysql server is 100%. It can't change even the cpu load drop and just hang up, won't consume latest binlog data and there is no exception information in logs. But when I only set up one flink job(20 parallism of source) and the cpu load of mysql server is less than 100%, that mistake never happens. Is there anything to do to avoid this mistake if we can't promise the cpu load always in a healthy state? !image-2024-07-23-15-47-10-376.png! was: I use mysql cdc to read data from mysql to write to doris, mysql cdc may change from sanpshot mode to binlog mode failed when I set up 2 flink job(20 parallism of source) and the cpu load of mysql server is 100%. It can't change even the cpu load drop and just hang up, won't consume latest binlog data. But when I only set up one flink job(20 parallism of source) and the cpu load of mysql server is less than 100%, that mistake never happens. Is there anything to do to avoid this mistake if we can't promise the cpu load always in a healthy state? !image-2024-07-23-15-47-10-376.png! > mysql cdc may change from snapshot mode to binlog mode failed when cpu load > of mysql server is 100% > --- > > Key: FLINK-35876 > URL: https://issues.apache.org/jira/browse/FLINK-35876 > Project: Flink > Issue Type: Bug >Reporter: Youle >Priority: Major > Attachments: image-2024-07-23-15-47-10-376.png > > > I use mysql cdc to read data from mysql to write to doris, mysql cdc may > change from sanpshot mode to binlog mode failed when I set up 2 flink job(20 > parallism of source) and the cpu load of mysql server is 100%. It can't > change even the cpu load drop and just hang up, won't consume latest binlog > data and there is no exception information in logs. But when I only set up > one flink job(20 parallism of source) and the cpu load of mysql server is > less than 100%, that mistake never happens. > Is there anything to do to avoid this mistake if we can't promise the cpu > load always in a healthy state? > !image-2024-07-23-15-47-10-376.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35876) mysql cdc may change from snapshot mode to binlog mode failed when cpu load of mysql server is 100%
[ https://issues.apache.org/jira/browse/FLINK-35876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Youle updated FLINK-35876: -- Description: I use mysql cdc to read data from mysql to write to doris, mysql cdc may change from sanpshot mode to binlog mode failed when I set up 2 flink job(20 parallism of source) and the cpu load of mysql server is 100%. It can't change even the cpu load drop and just hang up, won't consume latest binlog data and there is no exception information in logs. But when I only set up one flink job(20 parallism of source) and the cpu load of mysql server is less than 100%, that mistake never happens. mysql table: about 250 million data flink version: 1.18.1 mysql cdc connector version: 3.1.0 doris connector version :1.18-1.6.2 Is there anything to do to avoid this mistake if we can't promise the cpu load always in a healthy state? !image-2024-07-23-15-47-10-376.png! was: I use mysql cdc to read data from mysql to write to doris, mysql cdc may change from sanpshot mode to binlog mode failed when I set up 2 flink job(20 parallism of source) and the cpu load of mysql server is 100%. It can't change even the cpu load drop and just hang up, won't consume latest binlog data and there is no exception information in logs. But when I only set up one flink job(20 parallism of source) and the cpu load of mysql server is less than 100%, that mistake never happens. Is there anything to do to avoid this mistake if we can't promise the cpu load always in a healthy state? !image-2024-07-23-15-47-10-376.png! > mysql cdc may change from snapshot mode to binlog mode failed when cpu load > of mysql server is 100% > --- > > Key: FLINK-35876 > URL: https://issues.apache.org/jira/browse/FLINK-35876 > Project: Flink > Issue Type: Bug >Reporter: Youle >Priority: Major > Attachments: image-2024-07-23-15-47-10-376.png > > > I use mysql cdc to read data from mysql to write to doris, mysql cdc may > change from sanpshot mode to binlog mode failed when I set up 2 flink job(20 > parallism of source) and the cpu load of mysql server is 100%. It can't > change even the cpu load drop and just hang up, won't consume latest binlog > data and there is no exception information in logs. But when I only set up > one flink job(20 parallism of source) and the cpu load of mysql server is > less than 100%, that mistake never happens. > mysql table: about 250 million data > flink version: 1.18.1 > mysql cdc connector version: 3.1.0 > doris connector version :1.18-1.6.2 > > Is there anything to do to avoid this mistake if we can't promise the cpu > load always in a healthy state? > !image-2024-07-23-15-47-10-376.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35877) Shade protobuf in flink
zhuanshenbsj1 created FLINK-35877: - Summary: Shade protobuf in flink Key: FLINK-35877 URL: https://issues.apache.org/jira/browse/FLINK-35877 Project: Flink Issue Type: Improvement Components: BuildSystem / Shaded Affects Versions: 1.19.2 Reporter: zhuanshenbsj1 Fix For: 1.19.2 Shade the classes in protobuf to avoid class conflict. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35876) mysql cdc may change from snapshot mode to binlog mode failed when cpu load of mysql server is 100%
[ https://issues.apache.org/jira/browse/FLINK-35876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Youle updated FLINK-35876: -- Component/s: Flink CDC Affects Version/s: cdc-3.1.0 > mysql cdc may change from snapshot mode to binlog mode failed when cpu load > of mysql server is 100% > --- > > Key: FLINK-35876 > URL: https://issues.apache.org/jira/browse/FLINK-35876 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Youle >Priority: Major > Attachments: image-2024-07-23-15-47-10-376.png > > > I use mysql cdc to read data from mysql to write to doris, mysql cdc may > change from sanpshot mode to binlog mode failed when I set up 2 flink job(20 > parallism of source) and the cpu load of mysql server is 100%. It can't > change even the cpu load drop and just hang up, won't consume latest binlog > data and there is no exception information in logs. But when I only set up > one flink job(20 parallism of source) and the cpu load of mysql server is > less than 100%, that mistake never happens. > mysql table: about 250 million data > flink version: 1.18.1 > mysql cdc connector version: 3.1.0 > doris connector version :1.18-1.6.2 > > Is there anything to do to avoid this mistake if we can't promise the cpu > load always in a healthy state? > !image-2024-07-23-15-47-10-376.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35877] Shade protobuf in flink [flink]
zhuanshenbsj1 opened a new pull request, #25112: URL: https://github.com/apache/flink/pull/25112 ## What is the purpose of the change Shade the classes in protobuf to avoid class conflict. ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35877) Shade protobuf in flink
[ https://issues.apache.org/jira/browse/FLINK-35877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35877: --- Labels: pull-request-available (was: ) > Shade protobuf in flink > --- > > Key: FLINK-35877 > URL: https://issues.apache.org/jira/browse/FLINK-35877 > Project: Flink > Issue Type: Improvement > Components: BuildSystem / Shaded >Affects Versions: 1.19.2 >Reporter: zhuanshenbsj1 >Priority: Minor > Labels: pull-request-available > Fix For: 1.19.2 > > > Shade the classes in protobuf to avoid class conflict. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35880) CLONE - [Release-1.20] Stage source and binary releases on dist.apache.org
Weijie Guo created FLINK-35880: -- Summary: CLONE - [Release-1.20] Stage source and binary releases on dist.apache.org Key: FLINK-35880 URL: https://issues.apache.org/jira/browse/FLINK-35880 Project: Flink Issue Type: Sub-task Affects Versions: 1.20.0 Reporter: Weijie Guo Assignee: Weijie Guo Fix For: 1.20.0 Copy the source release to the dev repository of dist.apache.org: # If you have not already, check out the Flink section of the dev repository on dist.apache.org via Subversion. In a fresh directory: {code:bash} $ svn checkout https://dist.apache.org/repos/dist/dev/flink --depth=immediates {code} # Make a directory for the new release and copy all the artifacts (Flink source/binary distributions, hashes, GPG signatures and the python subdirectory) into that newly created directory: {code:bash} $ mkdir flink/flink-${RELEASE_VERSION}-rc${RC_NUM} $ mv /tools/releasing/release/* flink/flink-${RELEASE_VERSION}-rc${RC_NUM} {code} # Add and commit all the files. {code:bash} $ cd flink flink $ svn add flink-${RELEASE_VERSION}-rc${RC_NUM} flink $ svn commit -m "Add flink-${RELEASE_VERSION}-rc${RC_NUM}" {code} # Verify that files are present under [https://dist.apache.org/repos/dist/dev/flink|https://dist.apache.org/repos/dist/dev/flink]. # Push the release tag if not done already (the following command assumes to be called from within the apache/flink checkout): {code:bash} $ git push refs/tags/release-${RELEASE_VERSION}-rc${RC_NUM} {code} h3. Expectations * Maven artifacts deployed to the staging repository of [repository.apache.org|https://repository.apache.org/content/repositories/] * Source distribution deployed to the dev repository of [dist.apache.org|https://dist.apache.org/repos/dist/dev/flink/] * Check hashes (e.g. shasum -c *.sha512) * Check signatures (e.g. {{{}gpg --verify flink-1.2.3-source-release.tar.gz.asc flink-1.2.3-source-release.tar.gz{}}}) * {{grep}} for legal headers in each file. * If time allows check the NOTICE files of the modules whose dependencies have been changed in this release in advance, since the license issues from time to time pop up during voting. See [Verifying a Flink Release|https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Release] "Checking License" section. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35881) [1.20-rc2] Propose a pull request for website updates
[ https://issues.apache.org/jira/browse/FLINK-35881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-35881: --- Summary: [1.20-rc2] Propose a pull request for website updates (was: CLONE - [Release-1.20] Propose a pull request for website updates) > [1.20-rc2] Propose a pull request for website updates > - > > Key: FLINK-35881 > URL: https://issues.apache.org/jira/browse/FLINK-35881 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.17.0 >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > The final step of building the candidate is to propose a website pull request > containing the following changes: > # update > [apache/flink-web:_config.yml|https://github.com/apache/flink-web/blob/asf-site/_config.yml] > ## update {{FLINK_VERSION_STABLE}} and {{FLINK_VERSION_STABLE_SHORT}} as > required > ## update version references in quickstarts ({{{}q/{}}} directory) as > required > ## (major only) add a new entry to {{flink_releases}} for the release > binaries and sources > ## (minor only) update the entry for the previous release in the series in > {{flink_releases}} > ### Please pay notice to the ids assigned to the download entries. They > should be unique and reflect their corresponding version number. > ## add a new entry to {{release_archive.flink}} > # add a blog post announcing the release in _posts > # add a organized release notes page under docs/content/release-notes and > docs/content.zh/release-notes (like > [https://nightlies.apache.org/flink/flink-docs-release-1.15/release-notes/flink-1.15/]). > The page is based on the non-empty release notes collected from the issues, > and only the issues that affect existing users should be included (e.g., > instead of new functionality). It should be in a separate PR since it would > be merged to the flink project. > (!) Don’t merge the PRs before finalizing the release. > > > h3. Expectations > * Website pull request proposed to list the > [release|http://flink.apache.org/downloads.html] > * (major only) Check {{docs/config.toml}} to ensure that > ** the version constants refer to the new version > ** the {{baseurl}} does not point to {{flink-docs-master}} but > {{flink-docs-release-X.Y}} instead -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35881) CLONE - [Release-1.20] Propose a pull request for website updates
Weijie Guo created FLINK-35881: -- Summary: CLONE - [Release-1.20] Propose a pull request for website updates Key: FLINK-35881 URL: https://issues.apache.org/jira/browse/FLINK-35881 Project: Flink Issue Type: Sub-task Affects Versions: 1.17.0 Reporter: Weijie Guo Assignee: Weijie Guo Fix For: 1.20.0 The final step of building the candidate is to propose a website pull request containing the following changes: # update [apache/flink-web:_config.yml|https://github.com/apache/flink-web/blob/asf-site/_config.yml] ## update {{FLINK_VERSION_STABLE}} and {{FLINK_VERSION_STABLE_SHORT}} as required ## update version references in quickstarts ({{{}q/{}}} directory) as required ## (major only) add a new entry to {{flink_releases}} for the release binaries and sources ## (minor only) update the entry for the previous release in the series in {{flink_releases}} ### Please pay notice to the ids assigned to the download entries. They should be unique and reflect their corresponding version number. ## add a new entry to {{release_archive.flink}} # add a blog post announcing the release in _posts # add a organized release notes page under docs/content/release-notes and docs/content.zh/release-notes (like [https://nightlies.apache.org/flink/flink-docs-release-1.15/release-notes/flink-1.15/]). The page is based on the non-empty release notes collected from the issues, and only the issues that affect existing users should be included (e.g., instead of new functionality). It should be in a separate PR since it would be merged to the flink project. (!) Don’t merge the PRs before finalizing the release. h3. Expectations * Website pull request proposed to list the [release|http://flink.apache.org/downloads.html] * (major only) Check {{docs/config.toml}} to ensure that ** the version constants refer to the new version ** the {{baseurl}} does not point to {{flink-docs-master}} but {{flink-docs-release-X.Y}} instead -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35878) Build Release Candidate: 1.20.0-rc2
Weijie Guo created FLINK-35878: -- Summary: Build Release Candidate: 1.20.0-rc2 Key: FLINK-35878 URL: https://issues.apache.org/jira/browse/FLINK-35878 Project: Flink Issue Type: New Feature Affects Versions: 1.20.0 Reporter: Weijie Guo Assignee: Weijie Guo Fix For: 1.20.0 The core of the release process is the build-vote-fix cycle. Each cycle produces one release candidate. The Release Manager repeats this cycle until the community approves one release candidate, which is then finalized. h4. Prerequisites Set up a few environment variables to simplify Maven commands that follow. This identifies the release candidate being built. Start with {{RC_NUM}} equal to 1 and increment it for each candidate: {code} RC_NUM="1" TAG="release-${RELEASE_VERSION}-rc${RC_NUM}" {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35879) CLONE - [Release-1.20] Build and stage Java and Python artifacts
Weijie Guo created FLINK-35879: -- Summary: CLONE - [Release-1.20] Build and stage Java and Python artifacts Key: FLINK-35879 URL: https://issues.apache.org/jira/browse/FLINK-35879 Project: Flink Issue Type: Sub-task Affects Versions: 1.20.0 Reporter: Weijie Guo Assignee: Weijie Guo Fix For: 1.20.0 # Create a local release branch ((!) this step can not be skipped for minor releases): {code:bash} $ cd ./tools tools/ $ OLD_VERSION=$CURRENT_SNAPSHOT_VERSION NEW_VERSION=$RELEASE_VERSION RELEASE_CANDIDATE=$RC_NUM releasing/create_release_branch.sh {code} # Tag the release commit: {code:bash} $ git tag -s ${TAG} -m "${TAG}" {code} # We now need to do several things: ## Create the source release archive ## Deploy jar artefacts to the [Apache Nexus Repository|https://repository.apache.org/], which is the staging area for deploying the jars to Maven Central ## Build PyFlink wheel packages You might want to create a directory on your local machine for collecting the various source and binary releases before uploading them. Creating the binary releases is a lengthy process but you can do this on another machine (for example, in the "cloud"). When doing this, you can skip signing the release files on the remote machine, download them to your local machine and sign them there. # Build the source release: {code:bash} tools $ RELEASE_VERSION=$RELEASE_VERSION releasing/create_source_release.sh {code} # Stage the maven artifacts: {code:bash} tools $ releasing/deploy_staging_jars.sh {code} Review all staged artifacts ([https://repository.apache.org/]). They should contain all relevant parts for each module, including pom.xml, jar, test jar, source, test source, javadoc, etc. Carefully review any new artifacts. # Close the staging repository on Apache Nexus. When prompted for a description, enter “Apache Flink, version X, release candidate Y”. Then, you need to build the PyFlink wheel packages (since 1.11): # Set up an azure pipeline in your own Azure account. You can refer to [Azure Pipelines|https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository] for more details on how to set up azure pipeline for a fork of the Flink repository. Note that a google cloud mirror in Europe is used for downloading maven artifacts, therefore it is recommended to set your [Azure organization region|https://docs.microsoft.com/en-us/azure/devops/organizations/accounts/change-organization-location] to Europe to speed up the downloads. # Push the release candidate branch to your forked personal Flink repository, e.g. {code:bash} tools $ git push refs/heads/release-${RELEASE_VERSION}-rc${RC_NUM}:release-${RELEASE_VERSION}-rc${RC_NUM} {code} # Trigger the Azure Pipelines manually to build the PyFlink wheel packages ## Go to your Azure Pipelines Flink project → Pipelines ## Click the "New pipeline" button on the top right ## Select "GitHub" → your GitHub Flink repository → "Existing Azure Pipelines YAML file" ## Select your branch → Set path to "/azure-pipelines.yaml" → click on "Continue" → click on "Variables" ## Then click "New Variable" button, fill the name with "MODE", and the value with "release". Click "OK" to set the variable and the "Save" button to save the variables, then back on the "Review your pipeline" screen click "Run" to trigger the build. ## You should now see a build where only the "CI build (release)" is running # Download the PyFlink wheel packages from the build result page after the jobs of "build_wheels mac" and "build_wheels linux" have finished. ## Download the PyFlink wheel packages ### Open the build result page of the pipeline ### Go to the {{Artifacts}} page (build_wheels linux -> 1 artifact) ### Click {{wheel_Darwin_build_wheels mac}} and {{wheel_Linux_build_wheels linux}} separately to download the zip files ## Unzip these two zip files {code:bash} $ cd /path/to/downloaded_wheel_packages $ unzip wheel_Linux_build_wheels\ linux.zip $ unzip wheel_Darwin_build_wheels\ mac.zip{code} ## Create directory {{./dist}} under the directory of {{{}flink-python{}}}: {code:bash} $ cd $ mkdir flink-python/dist{code} ## Move the unzipped wheel packages to the directory of {{{}flink-python/dist{}}}: {code:java} $ mv /path/to/wheel_Darwin_build_wheels\ mac/* flink-python/dist/ $ mv /path/to/wheel_Linux_build_wheels\ linux/* flink-python/dist/ $ cd tools{code} Finally, we create the binary convenience release files: {code:bash} tools $ RELEASE_VERSION=$RELEASE_VERSION releasing/create_binary_release.sh {code} If you want to run this step in parallel on a remote machine you have to make the release commit available there (for example by pushing to a repository). *This is important: the commit inside the binary builds has to match the commit of the so
[jira] [Created] (FLINK-35882) CLONE - [Release-1.20] Vote on the release candidate
Weijie Guo created FLINK-35882: -- Summary: CLONE - [Release-1.20] Vote on the release candidate Key: FLINK-35882 URL: https://issues.apache.org/jira/browse/FLINK-35882 Project: Flink Issue Type: Sub-task Affects Versions: 1.17.0 Reporter: Weijie Guo Assignee: Weijie Guo Fix For: 1.17.0 Once you have built and individually reviewed the release candidate, please share it for the community-wide review. Please review foundation-wide [voting guidelines|http://www.apache.org/foundation/voting.html] for more information. Start the review-and-vote thread on the dev@ mailing list. Here’s an email template; please adjust as you see fit. {quote}From: Release Manager To: d...@flink.apache.org Subject: [VOTE] Release 1.2.3, release candidate #3 Hi everyone, Please review and vote on the release candidate #3 for the version 1.2.3, as follows: [ ] +1, Approve the release [ ] -1, Do not approve the release (please provide specific comments) The complete staging area is available for your review, which includes: * JIRA release notes [1], * the official Apache source release and binary convenience releases to be deployed to dist.apache.org [2], which are signed with the key with fingerprint [3], * all artifacts to be deployed to the Maven Central Repository [4], * source code tag "release-1.2.3-rc3" [5], * website pull request listing the new release and adding announcement blog post [6]. The vote will be open for at least 72 hours. It is adopted by majority approval, with at least 3 PMC affirmative votes. Thanks, Release Manager [1] link [2] link [3] [https://dist.apache.org/repos/dist/release/flink/KEYS] [4] link [5] link [6] link {quote} *If there are any issues found in the release candidate, reply on the vote thread to cancel the vote.* There’s no need to wait 72 hours. Proceed to the Fix Issues step below and address the problem. However, some issues don’t require cancellation. For example, if an issue is found in the website pull request, just correct it on the spot and the vote can continue as-is. For cancelling a release, the release manager needs to send an email to the release candidate thread, stating that the release candidate is officially cancelled. Next, all artifacts created specifically for the RC in the previous steps need to be removed: * Delete the staging repository in Nexus * Remove the source / binary RC files from dist.apache.org * Delete the source code tag in git *If there are no issues, reply on the vote thread to close the voting.* Then, tally the votes in a separate email. Here’s an email template; please adjust as you see fit. {quote}From: Release Manager To: d...@flink.apache.org Subject: [RESULT] [VOTE] Release 1.2.3, release candidate #3 I'm happy to announce that we have unanimously approved this release. There are XXX approving votes, XXX of which are binding: * approver 1 * approver 2 * approver 3 * approver 4 There are no disapproving votes. Thanks everyone! {quote} h3. Expectations * Community votes to release the proposed candidate, with at least three approving PMC votes Any issues that are raised till the vote is over should be either resolved or moved into the next release (if applicable). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35880) [1.20-rc2] Stage source and binary releases on dist.apache.org
[ https://issues.apache.org/jira/browse/FLINK-35880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-35880: --- Summary: [1.20-rc2] Stage source and binary releases on dist.apache.org (was: CLONE - [Release-1.20] Stage source and binary releases on dist.apache.org) > [1.20-rc2] Stage source and binary releases on dist.apache.org > -- > > Key: FLINK-35880 > URL: https://issues.apache.org/jira/browse/FLINK-35880 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Major > Fix For: 1.20.0 > > > Copy the source release to the dev repository of dist.apache.org: > # If you have not already, check out the Flink section of the dev repository > on dist.apache.org via Subversion. In a fresh directory: > {code:bash} > $ svn checkout https://dist.apache.org/repos/dist/dev/flink --depth=immediates > {code} > # Make a directory for the new release and copy all the artifacts (Flink > source/binary distributions, hashes, GPG signatures and the python > subdirectory) into that newly created directory: > {code:bash} > $ mkdir flink/flink-${RELEASE_VERSION}-rc${RC_NUM} > $ mv /tools/releasing/release/* > flink/flink-${RELEASE_VERSION}-rc${RC_NUM} > {code} > # Add and commit all the files. > {code:bash} > $ cd flink > flink $ svn add flink-${RELEASE_VERSION}-rc${RC_NUM} > flink $ svn commit -m "Add flink-${RELEASE_VERSION}-rc${RC_NUM}" > {code} > # Verify that files are present under > [https://dist.apache.org/repos/dist/dev/flink|https://dist.apache.org/repos/dist/dev/flink]. > # Push the release tag if not done already (the following command assumes to > be called from within the apache/flink checkout): > {code:bash} > $ git push refs/tags/release-${RELEASE_VERSION}-rc${RC_NUM} > {code} > > > h3. Expectations > * Maven artifacts deployed to the staging repository of > [repository.apache.org|https://repository.apache.org/content/repositories/] > * Source distribution deployed to the dev repository of > [dist.apache.org|https://dist.apache.org/repos/dist/dev/flink/] > * Check hashes (e.g. shasum -c *.sha512) > * Check signatures (e.g. {{{}gpg --verify > flink-1.2.3-source-release.tar.gz.asc flink-1.2.3-source-release.tar.gz{}}}) > * {{grep}} for legal headers in each file. > * If time allows check the NOTICE files of the modules whose dependencies > have been changed in this release in advance, since the license issues from > time to time pop up during voting. See [Verifying a Flink > Release|https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Release] > "Checking License" section. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35881) [1.20-rc2] Propose a pull request for website updates
[ https://issues.apache.org/jira/browse/FLINK-35881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-35881: --- Affects Version/s: 1.20.0 (was: 1.17.0) > [1.20-rc2] Propose a pull request for website updates > - > > Key: FLINK-35881 > URL: https://issues.apache.org/jira/browse/FLINK-35881 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > The final step of building the candidate is to propose a website pull request > containing the following changes: > # update > [apache/flink-web:_config.yml|https://github.com/apache/flink-web/blob/asf-site/_config.yml] > ## update {{FLINK_VERSION_STABLE}} and {{FLINK_VERSION_STABLE_SHORT}} as > required > ## update version references in quickstarts ({{{}q/{}}} directory) as > required > ## (major only) add a new entry to {{flink_releases}} for the release > binaries and sources > ## (minor only) update the entry for the previous release in the series in > {{flink_releases}} > ### Please pay notice to the ids assigned to the download entries. They > should be unique and reflect their corresponding version number. > ## add a new entry to {{release_archive.flink}} > # add a blog post announcing the release in _posts > # add a organized release notes page under docs/content/release-notes and > docs/content.zh/release-notes (like > [https://nightlies.apache.org/flink/flink-docs-release-1.15/release-notes/flink-1.15/]). > The page is based on the non-empty release notes collected from the issues, > and only the issues that affect existing users should be included (e.g., > instead of new functionality). It should be in a separate PR since it would > be merged to the flink project. > (!) Don’t merge the PRs before finalizing the release. > > > h3. Expectations > * Website pull request proposed to list the > [release|http://flink.apache.org/downloads.html] > * (major only) Check {{docs/config.toml}} to ensure that > ** the version constants refer to the new version > ** the {{baseurl}} does not point to {{flink-docs-master}} but > {{flink-docs-release-X.Y}} instead -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35879) [1.20-rc2] Build and stage Java and Python artifacts
[ https://issues.apache.org/jira/browse/FLINK-35879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-35879: --- Summary: [1.20-rc2] Build and stage Java and Python artifacts (was: CLONE - [Release-1.20] Build and stage Java and Python artifacts) > [1.20-rc2] Build and stage Java and Python artifacts > > > Key: FLINK-35879 > URL: https://issues.apache.org/jira/browse/FLINK-35879 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Major > Fix For: 1.20.0 > > > # Create a local release branch ((!) this step can not be skipped for minor > releases): > {code:bash} > $ cd ./tools > tools/ $ OLD_VERSION=$CURRENT_SNAPSHOT_VERSION NEW_VERSION=$RELEASE_VERSION > RELEASE_CANDIDATE=$RC_NUM releasing/create_release_branch.sh > {code} > # Tag the release commit: > {code:bash} > $ git tag -s ${TAG} -m "${TAG}" > {code} > # We now need to do several things: > ## Create the source release archive > ## Deploy jar artefacts to the [Apache Nexus > Repository|https://repository.apache.org/], which is the staging area for > deploying the jars to Maven Central > ## Build PyFlink wheel packages > You might want to create a directory on your local machine for collecting the > various source and binary releases before uploading them. Creating the binary > releases is a lengthy process but you can do this on another machine (for > example, in the "cloud"). When doing this, you can skip signing the release > files on the remote machine, download them to your local machine and sign > them there. > # Build the source release: > {code:bash} > tools $ RELEASE_VERSION=$RELEASE_VERSION releasing/create_source_release.sh > {code} > # Stage the maven artifacts: > {code:bash} > tools $ releasing/deploy_staging_jars.sh > {code} > Review all staged artifacts ([https://repository.apache.org/]). They should > contain all relevant parts for each module, including pom.xml, jar, test jar, > source, test source, javadoc, etc. Carefully review any new artifacts. > # Close the staging repository on Apache Nexus. When prompted for a > description, enter “Apache Flink, version X, release candidate Y”. > Then, you need to build the PyFlink wheel packages (since 1.11): > # Set up an azure pipeline in your own Azure account. You can refer to > [Azure > Pipelines|https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository] > for more details on how to set up azure pipeline for a fork of the Flink > repository. Note that a google cloud mirror in Europe is used for downloading > maven artifacts, therefore it is recommended to set your [Azure organization > region|https://docs.microsoft.com/en-us/azure/devops/organizations/accounts/change-organization-location] > to Europe to speed up the downloads. > # Push the release candidate branch to your forked personal Flink > repository, e.g. > {code:bash} > tools $ git push > refs/heads/release-${RELEASE_VERSION}-rc${RC_NUM}:release-${RELEASE_VERSION}-rc${RC_NUM} > {code} > # Trigger the Azure Pipelines manually to build the PyFlink wheel packages > ## Go to your Azure Pipelines Flink project → Pipelines > ## Click the "New pipeline" button on the top right > ## Select "GitHub" → your GitHub Flink repository → "Existing Azure > Pipelines YAML file" > ## Select your branch → Set path to "/azure-pipelines.yaml" → click on > "Continue" → click on "Variables" > ## Then click "New Variable" button, fill the name with "MODE", and the > value with "release". Click "OK" to set the variable and the "Save" button to > save the variables, then back on the "Review your pipeline" screen click > "Run" to trigger the build. > ## You should now see a build where only the "CI build (release)" is running > # Download the PyFlink wheel packages from the build result page after the > jobs of "build_wheels mac" and "build_wheels linux" have finished. > ## Download the PyFlink wheel packages > ### Open the build result page of the pipeline > ### Go to the {{Artifacts}} page (build_wheels linux -> 1 artifact) > ### Click {{wheel_Darwin_build_wheels mac}} and {{wheel_Linux_build_wheels > linux}} separately to download the zip files > ## Unzip these two zip files > {code:bash} > $ cd /path/to/downloaded_wheel_packages > $ unzip wheel_Linux_build_wheels\ linux.zip > $ unzip wheel_Darwin_build_wheels\ mac.zip{code} > ## Create directory {{./dist}} under the directory of {{{}flink-python{}}}: > {code:bash} > $ cd > $ mkdir flink-python/dist{code} > ## Move the unzipped wheel packages to the directory of > {{{}flink-python/dist{}}}: > {code:java} > $ mv /path/to/wheel_Darwin_build_wheels\ mac/* flink-
[jira] [Updated] (FLINK-35882) [1.20-rc2] Vote on the release candidate
[ https://issues.apache.org/jira/browse/FLINK-35882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-35882: --- Summary: [1.20-rc2] Vote on the release candidate (was: CLONE - [Release-1.20] Vote on the release candidate) > [1.20-rc2] Vote on the release candidate > > > Key: FLINK-35882 > URL: https://issues.apache.org/jira/browse/FLINK-35882 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.17.0 >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Major > Fix For: 1.17.0 > > > Once you have built and individually reviewed the release candidate, please > share it for the community-wide review. Please review foundation-wide [voting > guidelines|http://www.apache.org/foundation/voting.html] for more information. > Start the review-and-vote thread on the dev@ mailing list. Here’s an email > template; please adjust as you see fit. > {quote}From: Release Manager > To: d...@flink.apache.org > Subject: [VOTE] Release 1.2.3, release candidate #3 > Hi everyone, > Please review and vote on the release candidate #3 for the version 1.2.3, as > follows: > [ ] +1, Approve the release > [ ] -1, Do not approve the release (please provide specific comments) > The complete staging area is available for your review, which includes: > * JIRA release notes [1], > * the official Apache source release and binary convenience releases to be > deployed to dist.apache.org [2], which are signed with the key with > fingerprint [3], > * all artifacts to be deployed to the Maven Central Repository [4], > * source code tag "release-1.2.3-rc3" [5], > * website pull request listing the new release and adding announcement blog > post [6]. > The vote will be open for at least 72 hours. It is adopted by majority > approval, with at least 3 PMC affirmative votes. > Thanks, > Release Manager > [1] link > [2] link > [3] [https://dist.apache.org/repos/dist/release/flink/KEYS] > [4] link > [5] link > [6] link > {quote} > *If there are any issues found in the release candidate, reply on the vote > thread to cancel the vote.* There’s no need to wait 72 hours. Proceed to the > Fix Issues step below and address the problem. However, some issues don’t > require cancellation. For example, if an issue is found in the website pull > request, just correct it on the spot and the vote can continue as-is. > For cancelling a release, the release manager needs to send an email to the > release candidate thread, stating that the release candidate is officially > cancelled. Next, all artifacts created specifically for the RC in the > previous steps need to be removed: > * Delete the staging repository in Nexus > * Remove the source / binary RC files from dist.apache.org > * Delete the source code tag in git > *If there are no issues, reply on the vote thread to close the voting.* Then, > tally the votes in a separate email. Here’s an email template; please adjust > as you see fit. > {quote}From: Release Manager > To: d...@flink.apache.org > Subject: [RESULT] [VOTE] Release 1.2.3, release candidate #3 > I'm happy to announce that we have unanimously approved this release. > There are XXX approving votes, XXX of which are binding: > * approver 1 > * approver 2 > * approver 3 > * approver 4 > There are no disapproving votes. > Thanks everyone! > {quote} > > > h3. Expectations > * Community votes to release the proposed candidate, with at least three > approving PMC votes > Any issues that are raised till the vote is over should be either resolved or > moved into the next release (if applicable). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35882) [1.20-rc2] Vote on the release candidate
[ https://issues.apache.org/jira/browse/FLINK-35882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-35882: --- Affects Version/s: 1.20.0 (was: 1.17.0) > [1.20-rc2] Vote on the release candidate > > > Key: FLINK-35882 > URL: https://issues.apache.org/jira/browse/FLINK-35882 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Major > Fix For: 1.17.0 > > > Once you have built and individually reviewed the release candidate, please > share it for the community-wide review. Please review foundation-wide [voting > guidelines|http://www.apache.org/foundation/voting.html] for more information. > Start the review-and-vote thread on the dev@ mailing list. Here’s an email > template; please adjust as you see fit. > {quote}From: Release Manager > To: d...@flink.apache.org > Subject: [VOTE] Release 1.2.3, release candidate #3 > Hi everyone, > Please review and vote on the release candidate #3 for the version 1.2.3, as > follows: > [ ] +1, Approve the release > [ ] -1, Do not approve the release (please provide specific comments) > The complete staging area is available for your review, which includes: > * JIRA release notes [1], > * the official Apache source release and binary convenience releases to be > deployed to dist.apache.org [2], which are signed with the key with > fingerprint [3], > * all artifacts to be deployed to the Maven Central Repository [4], > * source code tag "release-1.2.3-rc3" [5], > * website pull request listing the new release and adding announcement blog > post [6]. > The vote will be open for at least 72 hours. It is adopted by majority > approval, with at least 3 PMC affirmative votes. > Thanks, > Release Manager > [1] link > [2] link > [3] [https://dist.apache.org/repos/dist/release/flink/KEYS] > [4] link > [5] link > [6] link > {quote} > *If there are any issues found in the release candidate, reply on the vote > thread to cancel the vote.* There’s no need to wait 72 hours. Proceed to the > Fix Issues step below and address the problem. However, some issues don’t > require cancellation. For example, if an issue is found in the website pull > request, just correct it on the spot and the vote can continue as-is. > For cancelling a release, the release manager needs to send an email to the > release candidate thread, stating that the release candidate is officially > cancelled. Next, all artifacts created specifically for the RC in the > previous steps need to be removed: > * Delete the staging repository in Nexus > * Remove the source / binary RC files from dist.apache.org > * Delete the source code tag in git > *If there are no issues, reply on the vote thread to close the voting.* Then, > tally the votes in a separate email. Here’s an email template; please adjust > as you see fit. > {quote}From: Release Manager > To: d...@flink.apache.org > Subject: [RESULT] [VOTE] Release 1.2.3, release candidate #3 > I'm happy to announce that we have unanimously approved this release. > There are XXX approving votes, XXX of which are binding: > * approver 1 > * approver 2 > * approver 3 > * approver 4 > There are no disapproving votes. > Thanks everyone! > {quote} > > > h3. Expectations > * Community votes to release the proposed candidate, with at least three > approving PMC votes > Any issues that are raised till the vote is over should be either resolved or > moved into the next release (if applicable). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
hlteoh37 commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1687672586 ## flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilderTest.java: ## @@ -25,14 +25,6 @@ /** Covers construction, defaults and sanity checking of {@link SqsSinkBuilder}. */ class SqsSinkBuilderTest { -@Test -void elementConverterOfSinkMustBeSetWhenBuilt() { -Assertions.assertThatExceptionOfType(NullPointerException.class) -.isThrownBy(() -> SqsSink.builder().setSqsUrl("sqlUrl").build()) -.withMessageContaining( -"No SerializationSchema was supplied to the SQS Sink builder."); -} - Review Comment: Why did we remove this unit test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35877] Shade protobuf in flink [flink]
flinkbot commented on PR #25112: URL: https://github.com/apache/flink/pull/25112#issuecomment-2244623653 ## CI report: * 8a98ca6989b8a1192f72c4d0fd58529e99f9bfd5 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35737] Prevent Memory Leak by Closing MemoryExecutionGraphInfoStore on MiniCluster Shutdown [flink]
fengjiajie commented on PR #25009: URL: https://github.com/apache/flink/pull/25009#issuecomment-2244648203 Hi @Samrat002, I was wondering if you had a chance to take another look at this PR. Please let me know if any further changes are needed. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-35868) Bump Mongo driver version to support Mongo 7.0+
[ https://issues.apache.org/jira/browse/FLINK-35868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiabao Sun reassigned FLINK-35868: -- Assignee: yux > Bump Mongo driver version to support Mongo 7.0+ > --- > > Key: FLINK-35868 > URL: https://issues.apache.org/jira/browse/FLINK-35868 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: yux >Assignee: yux >Priority: Major > Labels: pull-request-available > > Currently, MongoDB CDC connector depends on mongodb-driver v4.9.1, which > doesn't support Mongo Server 7.0+[1]. Upgrading dependency version would be > nice since Mongo 7.0 has been released nearly a year ago. > [1] https://www.mongodb.com/docs/drivers/java/sync/current/compatibility/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35623] Bump mongo-driver version from 4.7.2 to 5.1.1 to support MongoDB 7.0 [flink-connector-mongodb]
Jiabao-Sun commented on PR #36: URL: https://github.com/apache/flink-connector-mongodb/pull/36#issuecomment-2244667146 Hi @yux, could you help review this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35868][cdc-connector][mongodb] Add MongoDB 6.0 & 7.0 tests [flink-cdc]
Jiabao-Sun commented on PR #3489: URL: https://github.com/apache/flink-cdc/pull/3489#issuecomment-2244672599 Shall we bump the driver version from 4.7.1 to 5.1.1 as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35877) Shade protobuf in flink
[ https://issues.apache.org/jira/browse/FLINK-35877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868001#comment-17868001 ] Martijn Visser commented on FLINK-35877: This is tagged for Shaded, but it's not in https://github.com/apache/flink-shaded so this isn't the right component > Shade protobuf in flink > --- > > Key: FLINK-35877 > URL: https://issues.apache.org/jira/browse/FLINK-35877 > Project: Flink > Issue Type: Improvement > Components: BuildSystem / Shaded >Affects Versions: 1.19.2 >Reporter: zhuanshenbsj1 >Priority: Minor > Labels: pull-request-available > Fix For: 1.19.2 > > > Shade the classes in protobuf to avoid class conflict. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35868][cdc-connector][mongodb] Bump dependency version to support MongoDB 7.0 [flink-cdc]
yuxiqian commented on PR #3489: URL: https://github.com/apache/flink-cdc/pull/3489#issuecomment-2244705856 > Shall we bump the driver version from 4.7.1 to 5.1.1 as well? Done, bumped `mongo-kafka` version, too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35623] Bump mongo-driver version from 4.7.2 to 5.1.1 to support MongoDB 7.0 [flink-connector-mongodb]
Jiabao-Sun merged PR #36: URL: https://github.com/apache/flink-connector-mongodb/pull/36 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-35623) Bump mongo-driver version from 4.7.2 to 5.1.1 to support MongoDB 7.0
[ https://issues.apache.org/jira/browse/FLINK-35623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiabao Sun resolved FLINK-35623. Resolution: Implemented main: a7551187d904ed819db085fc36c2cf735913ed5e > Bump mongo-driver version from 4.7.2 to 5.1.1 to support MongoDB 7.0 > > > Key: FLINK-35623 > URL: https://issues.apache.org/jira/browse/FLINK-35623 > Project: Flink > Issue Type: New Feature > Components: Connectors / MongoDB >Affects Versions: mongodb-1.2.0 >Reporter: Jiabao Sun >Assignee: Jiabao Sun >Priority: Major > Labels: pull-request-available > Fix For: mongodb-1.3.0 > > > Bump mongo-driver version from 4.7.2 to 5.1.1 to support MongoDB 7.0 > > [https://www.mongodb.com/docs/drivers/java/sync/current/compatibility/] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1687687114 ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategyTest.java: ## @@ -69,46 +69,29 @@ class LocalInputPreferredSlotSharingStrategyTest { private TestingSchedulingExecutionVertex ev21; private TestingSchedulingExecutionVertex ev22; -private Set slotSharingGroups; - -@BeforeEach -void setUp() { -topology = new TestingSchedulingTopology(); - +private void setupCase() { ev11 = topology.newExecutionVertex(JOB_VERTEX_ID_1, 0); ev12 = topology.newExecutionVertex(JOB_VERTEX_ID_1, 1); ev21 = topology.newExecutionVertex(JOB_VERTEX_ID_2, 0); ev22 = topology.newExecutionVertex(JOB_VERTEX_ID_2, 1); -final SlotSharingGroup slotSharingGroup = new SlotSharingGroup(); -slotSharingGroup.addVertexToGroup(JOB_VERTEX_ID_1); -slotSharingGroup.addVertexToGroup(JOB_VERTEX_ID_2); -slotSharingGroups = Collections.singleton(slotSharingGroup); +slotsharingGroup.addVertexToGroup(JOB_VERTEX_ID_1); +slotsharingGroup.addVertexToGroup(JOB_VERTEX_ID_2); } -@Test -void testCoLocationConstraintIsRespected() { -topology.connect(ev11, ev22); -topology.connect(ev12, ev21); - -final CoLocationGroup coLocationGroup = -new TestingCoLocationGroup(JOB_VERTEX_ID_1, JOB_VERTEX_ID_2); -final Set coLocationGroups = Collections.singleton(coLocationGroup); - -final SlotSharingStrategy strategy = -new LocalInputPreferredSlotSharingStrategy( -topology, slotSharingGroups, coLocationGroups); - -assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(2); - assertThat(strategy.getExecutionSlotSharingGroup(ev11.getId()).getExecutionVertexIds()) -.contains(ev11.getId(), ev21.getId()); - assertThat(strategy.getExecutionSlotSharingGroup(ev12.getId()).getExecutionVertexIds()) -.contains(ev12.getId(), ev22.getId()); +@Override +protected SlotSharingStrategy getSlotSharingStrategy( +SchedulingTopology topology, +Set slotSharingGroups, +Set coLocationGroups) { +return new LocalInputPreferredSlotSharingStrategy( +topology, slotSharingGroups, coLocationGroups); } @Test void testInputLocalityIsRespectedWithRescaleEdge() { +setupCase(); Review Comment: It's a little wired to call `setupCase` in too many tests. Could we define a `protected abstract void doSetup();` in `AbstractSlotSharingStrategyTest` and `setUp` call `doSetup()`? ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java: ## @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This strategy tries to get a balanced tasks scheduling. Execution vertices, which are belong to + * the same SlotSharingGroup, tend to be put evenly in each ExecutionSlotSharingGroup. Co-location + * con
Re: [PR] [FLINK-35877] Shade protobuf in flink [flink]
zhuanshenbsj1 commented on PR #25112: URL: https://github.com/apache/flink/pull/25112#issuecomment-2244784197 > Which part of the Flink runtime/system relies on Protobuf, that would justify shading Protobuf? This is mainly to avoid the dependency conflict between the protobuf used in the developer's code and flink itself. protobuf is very commonly used in development. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33977) Adaptive scheduler may not minimize the number of TMs during downscaling
[ https://issues.apache.org/jira/browse/FLINK-33977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] RocMarshal updated FLINK-33977: --- Attachment: screenshot-1.png > Adaptive scheduler may not minimize the number of TMs during downscaling > > > Key: FLINK-33977 > URL: https://issues.apache.org/jira/browse/FLINK-33977 > Project: Flink > Issue Type: Improvement > Components: Autoscaler, Runtime / Coordination >Affects Versions: 1.18.0, 1.19.0, 1.20.0 >Reporter: Zhanghao Chen >Priority: Major > Attachments: screenshot-1.png > > > Adaptive Scheduler uses SlotAssigner to assign free slots to slot sharing > groups. Currently, there're two implementations of SlotAssigner available: > the > DefaultSlotAssigner that treats all slots and slot sharing groups equally and > the {color:#172b4d}StateLocalitySlotAssigner{color} that assigns slots based > on the number of local key groups to utilize local state recovery. The > scheduler will use the DefaultSlotAssigner when no key group assignment info > is available and use the StateLocalitySlotAssigner otherwise. > > However, none of the SlotAssigner targets at minimizing the number of TMs, > which may produce suboptimal slot assignment under the Application Mode. For > example, when a job with 8 slot sharing groups and 2 TMs (each 4 slots) is > downscaled through the FLIP-291 API to have 4 slot sharing groups instead, > the cluster may still have 2 TMs, one with 1 free slot, and the other with 3 > free slots. For end-users, this implies an ineffective downscaling as the > total cluster resources are not reduced. > > We should take minimizing number of TMs into consideration as well. A > possible approach is to enhance the {color:#172b4d}StateLocalitySlotAssigner: > when the number of free slots exceeds need, sort all the TMs by a score > summing from the allocation scores of all slots on it, remove slots from the > excessive TMs with the lowest score and proceed the remaining slot > assignment.{color} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33977) Adaptive scheduler may not minimize the number of TMs during downscaling
[ https://issues.apache.org/jira/browse/FLINK-33977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868012#comment-17868012 ] RocMarshal commented on FLINK-33977: Thanks [~Zhanghao Chen] for reporting it. This issue is also very serious within our jobs. After scaling down, resources cannot be released. !screenshot-1.png! As shown in the figure, 6 TMs, each using only 1 slot. BTW, is any progress here? I'm interested in the jira, May I get the ticket ? Thank you very much! > Adaptive scheduler may not minimize the number of TMs during downscaling > > > Key: FLINK-33977 > URL: https://issues.apache.org/jira/browse/FLINK-33977 > Project: Flink > Issue Type: Improvement > Components: Autoscaler, Runtime / Coordination >Affects Versions: 1.18.0, 1.19.0, 1.20.0 >Reporter: Zhanghao Chen >Priority: Major > Attachments: screenshot-1.png > > > Adaptive Scheduler uses SlotAssigner to assign free slots to slot sharing > groups. Currently, there're two implementations of SlotAssigner available: > the > DefaultSlotAssigner that treats all slots and slot sharing groups equally and > the {color:#172b4d}StateLocalitySlotAssigner{color} that assigns slots based > on the number of local key groups to utilize local state recovery. The > scheduler will use the DefaultSlotAssigner when no key group assignment info > is available and use the StateLocalitySlotAssigner otherwise. > > However, none of the SlotAssigner targets at minimizing the number of TMs, > which may produce suboptimal slot assignment under the Application Mode. For > example, when a job with 8 slot sharing groups and 2 TMs (each 4 slots) is > downscaled through the FLIP-291 API to have 4 slot sharing groups instead, > the cluster may still have 2 TMs, one with 1 free slot, and the other with 3 > free slots. For end-users, this implies an ineffective downscaling as the > total cluster resources are not reduced. > > We should take minimizing number of TMs into consideration as well. A > possible approach is to enhance the {color:#172b4d}StateLocalitySlotAssigner: > when the number of free slots exceeds need, sort all the TMs by a score > summing from the allocation scores of all slots on it, remove slots from the > excessive TMs with the lowest score and proceed the remaining slot > assignment.{color} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35874][cdc-connector][mysql] Check pureBinlogPhaseTables set before call getBinlogPosition method [flink-cdc]
qiaozongmi commented on PR #3488: URL: https://github.com/apache/flink-cdc/pull/3488#issuecomment-2244820245 @leonardBang PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35874) Check pureBinlogPhaseTables set before call getBinlogPosition method in BinlogSplitReader
[ https://issues.apache.org/jira/browse/FLINK-35874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35874: --- Labels: pull-request-available (was: ) > Check pureBinlogPhaseTables set before call getBinlogPosition method in > BinlogSplitReader > - > > Key: FLINK-35874 > URL: https://issues.apache.org/jira/browse/FLINK-35874 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: Zhongmin Qiao >Assignee: Zhongmin Qiao >Priority: Minor > Labels: pull-request-available > Attachments: image-2024-07-22-19-26-59-158.png, > image-2024-07-22-19-27-19-366.png, image-2024-07-22-19-30-08-989.png, > image-2024-07-22-19-36-20-481.png, image-2024-07-22-19-36-40-581.png, > image-2024-07-22-19-37-35-542.png, image-2024-07-22-21-12-03-316.png > > > The method getBinlogPosition of RecordUtil which is called by > BinlogSplitReader. > shouldEmit is a highly performance-consuming method. This is because it > iterates through the sourceOffset map of the SourceRecord, and during the > iteration, it also performs a toString() conversion on the value. Finally, it > calls the putAll method of BinlogOffsetBuilder to put all the elements > obtained from the iteration into the offsetMap (which involves another map > traversal and hashcode computation). Despite the significant performance > impact of getBinlogPosition, we still need to call it when emitting each > DataChangeRecord, which reduces the efficiency of data processing in Flink > CDC. > !image-2024-07-22-19-26-59-158.png|width=545,height=222! > !image-2024-07-22-19-27-19-366.png|width=545,height=119! > However, we can optimize and avoid frequent invocations of getBinlogPosition > by moving the check pureBinlogPhaseTables.contains(tableId) in the > hasEnterPureBinlogPhase method before calling getBinlogPosition. This way, if > the SourceRecord belongs to a pure binlog phase table, we can directly return > true without the need for the highly performance-consuming getBinlogPosition > method. > diff > !image-2024-07-22-21-12-03-316.png|width=548,height=236! > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33874][runtime] Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler [flink]
RocMarshal opened a new pull request, #25113: URL: https://github.com/apache/flink/pull/25113 ## What is the purpose of the change Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler ## Brief change log - introduce slot.request.max-interval - implement the mechanism. ## Verifying this change This change is already covered by added corresponding test cases. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33874) Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler
[ https://issues.apache.org/jira/browse/FLINK-33874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33874: --- Labels: pull-request-available (was: ) > Support resource request wait mechanism at DefaultDeclarativeSlotPool side > for Default Scheduler > > > Key: FLINK-33874 > URL: https://issues.apache.org/jira/browse/FLINK-33874 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: RocMarshal >Assignee: RocMarshal >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33874][runtime] Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler [flink]
flinkbot commented on PR #25113: URL: https://github.com/apache/flink/pull/25113#issuecomment-2244834549 ## CI report: * 7e539f5fc908f38eebba16f9480bcf3f5d5009c3 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35883) Wildcard projection inserts column at wrong place
[ https://issues.apache.org/jira/browse/FLINK-35883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868022#comment-17868022 ] yux commented on FLINK-35883: - I'd like to investigate this based on FLINK-35272. > Wildcard projection inserts column at wrong place > - > > Key: FLINK-35883 > URL: https://issues.apache.org/jira/browse/FLINK-35883 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: yux >Priority: Major > > In this case where a wildcard projection was declared: > ```yaml > transform: > - projection: \*, 'extras' AS extras > ``` > For upstream schema [a, b, c], transform operator should send [a, b, c, > extras] to downstream. > However, if another column 'd' was inserted at the end, upstream schema would > be [a, b, c, d], and one might expect transformed schema to be [a, b, c, d, > extras]. But it's [a, b, c, extras, d], since `AddColumnEvent{d, > position=LAST}` was applied to [a, b, c, extras] after the projection process. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35883) Wildcard projection inserts column at wrong place
yux created FLINK-35883: --- Summary: Wildcard projection inserts column at wrong place Key: FLINK-35883 URL: https://issues.apache.org/jira/browse/FLINK-35883 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: yux In this case where a wildcard projection was declared: ```yaml transform: - projection: \*, 'extras' AS extras ``` For upstream schema [a, b, c], transform operator should send [a, b, c, extras] to downstream. However, if another column 'd' was inserted at the end, upstream schema would be [a, b, c, d], and one might expect transformed schema to be [a, b, c, d, extras]. But it's [a, b, c, extras, d], since `AddColumnEvent{d, position=LAST}` was applied to [a, b, c, extras] after the projection process. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column [flink-cdc]
leonardBang commented on code in PR #3285: URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1687881675 ## flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/data_types_test.sql: ## @@ -0,0 +1,28 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +DROP TABLE IF EXISTS DATA_TYPES_TABLE; + +CREATE TABLE DATA_TYPES_TABLE ( +ID INT NOT NULL, +TS DATETIME(0), +NUM DECIMAL(10, 2), +PRIMARY KEY (ID) +); Review Comment: The type is not so complex from my point, could you refer `fullTypesTest` of MySQL CDC Source? ## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java: ## @@ -56,6 +67,181 @@ public static List createFieldGetters(List colum return fieldGetters; } +/** Restore original data fields from RecordData structure. */ +public static List restoreOriginalData( +@Nullable RecordData recordData, List fieldGetters) { +if (recordData == null) { +return Collections.emptyList(); +} +List actualFields = new ArrayList<>(); +for (RecordData.FieldGetter fieldGetter : fieldGetters) { +actualFields.add(fieldGetter.getFieldOrNull(recordData)); +} +return actualFields; +} + +/** Merge compatible upstream schemas. */ +public static Schema mergeCompatibleSchemas(List schemas) { +if (schemas.isEmpty()) { +return null; +} else if (schemas.size() == 1) { +return schemas.get(0); +} else { +Schema outputSchema = null; +for (Schema schema : schemas) { +outputSchema = mergeSchema(outputSchema, schema); +} +return outputSchema; +} +} + +/** Try to combine two schemas with potential incompatible type. */ +@VisibleForTesting +public static Schema mergeSchema(@Nullable Schema lSchema, Schema rSchema) { +if (lSchema == null) { +return rSchema; +} +if (lSchema.getColumnCount() != rSchema.getColumnCount()) { +throw new IllegalStateException( +String.format( +"Unable to merge schema %s and %s with different column counts.", +lSchema, rSchema)); +} +if (!lSchema.primaryKeys().equals(rSchema.primaryKeys())) { +throw new IllegalStateException( +String.format( +"Unable to merge schema %s and %s with different primary keys.", +lSchema, rSchema)); +} +if (!lSchema.partitionKeys().equals(rSchema.partitionKeys())) { +throw new IllegalStateException( +String.format( +"Unable to merge schema %s and %s with different partition keys.", +lSchema, rSchema)); +} +if (!lSchema.options().equals(rSchema.options())) { +throw new IllegalStateException( +String.format( +"Unable to merge schema %s and %s with different options.", +lSchema, rSchema)); +} +if (!Objects.equals(lSchema.comment(), rSchema.comment())) { +throw new IllegalStateException( +String.format( +"Unable to merge schema %s and %s with different comments.", +lSchema, rSchema)); +} + +List leftColumns = lSchema.getColumns(); +List rightColumns = rSchema.getColumns(); + +List mergedColumns = +IntStream.range(0, lSchema.getColumnCount()) +.mapToObj(i -> mergeColumn(leftColumns.get(i), rightColumns.get(i))) +.collect(Collectors.toList()); + +return lSchema.copy(mergedColumns); +} + +/** Try to combine two columns with potential incompatible type. */ +@VisibleForTesting +public static Column mergeColumn(Column lColumn, Column rColumn) { +if (!Objects.equals(lColumn.getName(), rColumn.getName())) { +thro
Re: [PR] [FLINK-35868][cdc-connector][mongodb] Bump dependency version to support MongoDB 7.0 [flink-cdc]
leonardBang commented on code in PR #3489: URL: https://github.com/apache/flink-cdc/pull/3489#discussion_r1687920162 ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/pom.xml: ## @@ -69,7 +69,7 @@ limitations under the License. org.mongodb mongodb-driver-sync -4.9.1 +5.1.1 Review Comment: Could you also update the docs ? and the older(release-3.1) version docs seem use an incorrect driver version. https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/overview/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35884) Support
JunboWang created FLINK-35884: - Summary: Support Key: FLINK-35884 URL: https://issues.apache.org/jira/browse/FLINK-35884 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: JunboWang -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35884) Pipeline connector MySQL support snapshot.chunk.key-column
[ https://issues.apache.org/jira/browse/FLINK-35884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] JunboWang updated FLINK-35884: -- Summary: Pipeline connector MySQL support snapshot.chunk.key-column (was: Support) > Pipeline connector MySQL support snapshot.chunk.key-column > -- > > Key: FLINK-35884 > URL: https://issues.apache.org/jira/browse/FLINK-35884 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: JunboWang >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
ferenc-csaky commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1687799832 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java: ## @@ -426,6 +438,14 @@ public static String operatorConfigKey(String key) { .withDescription( "Max allowed checkpoint age for initiating last-state upgrades on running jobs. If a checkpoint is not available within the desired age (and nothing in progress) a savepoint will be triggered."); +@Documentation.Section(SECTION_DYNAMIC) +public static final ConfigOption OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE = +operatorConfig("savepoint.dispose-on-delete") +.booleanType() +.defaultValue(false) +.withDescription( +"Savepoint data for FlinkStateSnapshot resources created by the operator during upgrades and periodic savepoints will be disposed of automatically when the generated Kubernetes resource is deleted."); Review Comment: nit: "...savepoints will be disposed ~of~ automatically when..." ## flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkStateSnapshotState.java: ## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.api.status; + +import org.apache.flink.annotation.Experimental; + +/** Describes current snapshot state. */ +@Experimental +public enum FlinkStateSnapshotState { Review Comment: Maybe move this inside the `FlinkStateSnapshotStatus`, and then the enum name could be simply `State`. This might improve the overall readability and we can work around the kind of confusing wording here a bit better. I see that `ReconciliationStatus` and `ReconciliationState` follows the current setup and this just setup the same way, but the wording is straightforward there. ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java: ## @@ -297,6 +297,18 @@ public static String operatorConfigKey(String key) { "Custom HTTP header for HttpArtifactFetcher. The header will be applied when getting the session job artifacts. " + "Expected format: headerKey1:headerValue1,headerKey2:headerValue2."); +@Documentation.Section(SECTION_DYNAMIC) +public static final ConfigOption SNAPSHOT_RESOURCE_ENABLED = +operatorConfig("snapshot.resource.enabled") +.booleanType() +.defaultValue(true) +.withDescription( +"Create new FlinkStateSnapshot resources for storing snapshots. " ++ "Disable if you wish to use the deprecated mode and save snapshot results to " ++ "FlinkDeployment/FlinkSessionJob status fields. The Operator will fallback to the " Review Comment: nit: "The Operator will fallback to ~the~" ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java: ## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +pac
[jira] [Commented] (FLINK-33977) Adaptive scheduler may not minimize the number of TMs during downscaling
[ https://issues.apache.org/jira/browse/FLINK-33977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868038#comment-17868038 ] Zhanghao Chen commented on FLINK-33977: --- Hi [~RocMarshal], there's no progress on my side. Please go ahead~ > Adaptive scheduler may not minimize the number of TMs during downscaling > > > Key: FLINK-33977 > URL: https://issues.apache.org/jira/browse/FLINK-33977 > Project: Flink > Issue Type: Improvement > Components: Autoscaler, Runtime / Coordination >Affects Versions: 1.18.0, 1.19.0, 1.20.0 >Reporter: Zhanghao Chen >Priority: Major > Attachments: screenshot-1.png > > > Adaptive Scheduler uses SlotAssigner to assign free slots to slot sharing > groups. Currently, there're two implementations of SlotAssigner available: > the > DefaultSlotAssigner that treats all slots and slot sharing groups equally and > the {color:#172b4d}StateLocalitySlotAssigner{color} that assigns slots based > on the number of local key groups to utilize local state recovery. The > scheduler will use the DefaultSlotAssigner when no key group assignment info > is available and use the StateLocalitySlotAssigner otherwise. > > However, none of the SlotAssigner targets at minimizing the number of TMs, > which may produce suboptimal slot assignment under the Application Mode. For > example, when a job with 8 slot sharing groups and 2 TMs (each 4 slots) is > downscaled through the FLIP-291 API to have 4 slot sharing groups instead, > the cluster may still have 2 TMs, one with 1 free slot, and the other with 3 > free slots. For end-users, this implies an ineffective downscaling as the > total cluster resources are not reduced. > > We should take minimizing number of TMs into consideration as well. A > possible approach is to enhance the {color:#172b4d}StateLocalitySlotAssigner: > when the number of free slots exceeds need, sort all the TMs by a score > summing from the allocation scores of all slots on it, remove slots from the > excessive TMs with the lowest score and proceed the remaining slot > assignment.{color} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35868][cdc-connector][mongodb] Bump dependency version to support MongoDB 7.0 [flink-cdc]
yuxiqian commented on PR #3489: URL: https://github.com/apache/flink-cdc/pull/3489#issuecomment-2245079611 I'll investigate this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35884) Pipeline connector MySQL support snapshot.chunk.key-column
[ https://issues.apache.org/jira/browse/FLINK-35884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] JunboWang updated FLINK-35884: -- Description: flink-connector-mysql-cdc `MySqlSourceOptions` supports specifying the parameter scan.incremental.snapshot.chunk.key-column to divide chunks, pipeline connector should also support. > Pipeline connector MySQL support snapshot.chunk.key-column > -- > > Key: FLINK-35884 > URL: https://issues.apache.org/jira/browse/FLINK-35884 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: JunboWang >Priority: Minor > > flink-connector-mysql-cdc `MySqlSourceOptions` supports specifying the > parameter scan.incremental.snapshot.chunk.key-column to divide chunks, > pipeline connector should also support. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35884) Pipeline connector MySQL support scan.incremental.snapshot.chunk.key-column
[ https://issues.apache.org/jira/browse/FLINK-35884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] JunboWang updated FLINK-35884: -- Summary: Pipeline connector MySQL support scan.incremental.snapshot.chunk.key-column (was: Pipeline connector MySQL support snapshot.chunk.key-column) > Pipeline connector MySQL support scan.incremental.snapshot.chunk.key-column > --- > > Key: FLINK-35884 > URL: https://issues.apache.org/jira/browse/FLINK-35884 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: JunboWang >Priority: Minor > > flink-connector-mysql-cdc `MySqlSourceOptions` supports specifying the > parameter scan.incremental.snapshot.chunk.key-column to divide chunks, > pipeline connector should also support. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35884) Pipeline MySQL connector support scan.incremental.snapshot.chunk.key-column
[ https://issues.apache.org/jira/browse/FLINK-35884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] JunboWang updated FLINK-35884: -- Summary: Pipeline MySQL connector support scan.incremental.snapshot.chunk.key-column (was: Pipeline connector MySQL support scan.incremental.snapshot.chunk.key-column) > Pipeline MySQL connector support scan.incremental.snapshot.chunk.key-column > --- > > Key: FLINK-35884 > URL: https://issues.apache.org/jira/browse/FLINK-35884 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: JunboWang >Priority: Minor > > flink-connector-mysql-cdc `MySqlSourceOptions` supports specifying the > parameter scan.incremental.snapshot.chunk.key-column to divide chunks, > pipeline connector should also support. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35884] MySQL pipeline support snapshot chunk key-column [flink-cdc]
beryllw opened a new pull request, #3490: URL: https://github.com/apache/flink-cdc/pull/3490 flink-connector-mysql-cdc `MySqlSourceOptions` supports specifying the parameter scan.incremental.snapshot.chunk.key-column to divide chunks, pipeline connector should also support. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35884) Pipeline MySQL connector supports setting chunk column key
[ https://issues.apache.org/jira/browse/FLINK-35884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] JunboWang updated FLINK-35884: -- Summary: Pipeline MySQL connector supports setting chunk column key (was: Pipeline MySQL connector support scan.incremental.snapshot.chunk.key-column) > Pipeline MySQL connector supports setting chunk column key > -- > > Key: FLINK-35884 > URL: https://issues.apache.org/jira/browse/FLINK-35884 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: JunboWang >Priority: Minor > > flink-connector-mysql-cdc `MySqlSourceOptions` supports specifying the > parameter scan.incremental.snapshot.chunk.key-column to divide chunks, > pipeline connector should also support. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35884) Pipeline MySQL connector supports setting chunk column key
[ https://issues.apache.org/jira/browse/FLINK-35884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35884: --- Labels: pull-request-available (was: ) > Pipeline MySQL connector supports setting chunk column key > -- > > Key: FLINK-35884 > URL: https://issues.apache.org/jira/browse/FLINK-35884 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: JunboWang >Priority: Minor > Labels: pull-request-available > > flink-connector-mysql-cdc `MySqlSourceOptions` supports specifying the > parameter scan.incremental.snapshot.chunk.key-column to divide chunks, > pipeline connector should also support. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35242] Supports per-SE type configuration & "lenient" evolution behavior [flink-cdc]
leonardBang commented on code in PR #3339: URL: https://github.com/apache/flink-cdc/pull/3339#discussion_r1687943729 ## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DataSink.java: ## @@ -30,5 +33,5 @@ public interface DataSink { EventSinkProvider getEventSinkProvider(); /** Get the {@link MetadataApplier} for applying metadata changes to external systems. */ -MetadataApplier getMetadataApplier(); +MetadataApplier getMetadataApplier(Set enabledEventTypes); Review Comment: This is a public API, we should keep the backward compatibility ## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/MetadataApplier.java: ## @@ -19,13 +19,22 @@ import org.apache.flink.cdc.common.annotation.PublicEvolving; import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEventType; +import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; import java.io.Serializable; +import java.util.Set; /** {@code MetadataApplier} is used to apply metadata changes to external systems. */ @PublicEvolving public interface MetadataApplier extends Serializable { +/** Checks if this metadata applier should handle this event type. */ +boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEventType); + +/** Checks what kind of schema change events downstream can handle. */ +Set getSupportedSchemaEvolutionTypes(); + Review Comment: Please consider the compatibility when change existing public API. Imaging the case that user has a custom pipeline connector, is it still work after he/she bump the flink cdc version? ## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/SchemaEvolveException.java: ## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.exceptions; + +import org.apache.flink.cdc.common.event.SchemaChangeEvent; + +import javax.annotation.Nullable; + +/** An exception occurred during schema evolution. */ +public class SchemaEvolveException extends Exception { Review Comment: extends `FlinkRuntimeException` or `CDCRuntimeException` ? ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisDataSink.java: ## @@ -74,7 +76,7 @@ public EventSinkProvider getEventSinkProvider() { } @Override -public MetadataApplier getMetadataApplier() { -return new DorisMetadataApplier(dorisOptions, configuration); +public MetadataApplier getMetadataApplier(Set enabledEventTypes) { +return new DorisMetadataApplier(dorisOptions, configuration, enabledEventTypes); } Review Comment: You need change all connectors in one PR if you bring an incompatible change in public API, we should avoid this kind of change. ## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/SchemaChangeBehavior.java: ## @@ -22,7 +22,9 @@ /** Behavior for handling schema changes. */ @PublicEvolving public enum SchemaChangeBehavior { -EVOLVE, IGNORE, Review Comment: you can add a note about the order adjustment ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/metrics/SchemaOperatorMetrics.java: ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and
Re: [PR] [FLINK-35857] fix redeploy failed deployment without latest checkpoint [flink-kubernetes-operator]
gyfora commented on code in PR #855: URL: https://github.com/apache/flink-kubernetes-operator/pull/855#discussion_r1687973449 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java: ## @@ -289,6 +289,8 @@ public boolean reconcileOtherChanges(FlinkResourceContext ctx) throws Except LOG.info("Stopping failed Flink job..."); cleanupAfterFailedJob(ctx); status.setError(null); +ReconciliationUtils.updateStatusForDeployedSpec( +ctx.getResource(), ctx.getDeployConfig(ctx.getResource().getSpec()), clock); Review Comment: I don't really understand how this would solve the issue... Could you please explain why you think it solves it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
hlteoh37 commented on PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2245153099 @19priyadhingra The tests seems to be failing. Can we please take a look? Also - it would be good if we squash the commits! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35885) SlicingWindowOperator should not process watermark with proctime
Baozhu Zhao created FLINK-35885: --- Summary: SlicingWindowOperator should not process watermark with proctime Key: FLINK-35885 URL: https://issues.apache.org/jira/browse/FLINK-35885 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.17.2, 1.13.6 Environment: flink 1.13.6 or flink 1.17.2 Reporter: Baozhu Zhao We have discovered an unexpected case where abnormal data with a count of 0 occurs when performing proctime window aggregation on data with a watermark, The SQL is as follows {code:sql} CREATE TABLE s1 ( id INT, event_time TIMESTAMP(3), name string, proc_time AS PROCTIME (), WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND ) WITH ('connector' = 'my-source') ; SELECT * FROM ( SELECT name, COUNT(id) AS total_count, window_start, window_end FROM TABLE ( TUMBLE ( TABLE s1, DESCRIPTOR (proc_time), INTERVAL '30' SECONDS ) ) GROUP BY window_start, window_end, name ) WHERE total_count = 0; {code} For detailed test code, please refer to xxx -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35285) Autoscaler key group optimization can interfere with scale-down.max-factor
[ https://issues.apache.org/jira/browse/FLINK-35285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868069#comment-17868069 ] Gyula Fora commented on FLINK-35285: Hey [~trystan] sorry for the late reply. I see the problem... Once you are close to the max parallelism you need higher scale down factor, but you generally want to keep this fairly conservative. One approach that we could also consider is that we always allow scaling up/down to the nearest valid parallelism regardless of the max scaling factors. I think this a bit risky and I am not too much in favour of it. For your particular use case, it may be best to solve the issue by changing your max-parallelism setting from 120 to something like 720 or 1260. As long as your job parallelism is very small compared to the max parallelism and we have a lot of divisors the algorithm has a lot of flexibility even with small scale factors. In other words you could chose your max parallelism based on the desired scale factor. For small scale factors like 0.1 you will need a max parallelism that high compared to the usual parallelism. You can still set a vertex max parallelism in the autoscaler to avoid scaling very high in practice. > Autoscaler key group optimization can interfere with scale-down.max-factor > -- > > Key: FLINK-35285 > URL: https://issues.apache.org/jira/browse/FLINK-35285 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Trystan >Priority: Minor > > When setting a less aggressive scale down limit, the key group optimization > can prevent a vertex from scaling down at all. It will hunt from target > upwards to maxParallelism/2, and will always find currentParallelism again. > > A simple test trying to scale down from a parallelism of 60 with a > scale-down.max-factor of 0.2: > {code:java} > assertEquals(48, JobVertexScaler.scale(60, inputShipStrategies, 360, .8, 8, > 360)); {code} > > It seems reasonable to make a good attempt to spread data across subtasks, > but not at the expense of total deadlock. The problem is that during scale > down it doesn't actually ensure that newParallelism will be < > currentParallelism. The only workaround is to set a scale down factor large > enough such that it finds the next lowest divisor of the maxParallelism. > > Clunky, but something to ensure it can make at least some progress. There is > another test that now fails, but just to illustrate the point: > {code:java} > for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) > { > if ((scaleFactor < 1 && p < currentParallelism) || (scaleFactor > 1 && p > > currentParallelism)) { > if (maxParallelism % p == 0) { > return p; > } > } > } {code} > > Perhaps this is by design and not a bug, but total failure to scale down in > order to keep optimized key groups does not seem ideal. > > Key group optimization block: > [https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296C1-L303C10] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35776] Simplify job status handling [flink-kubernetes-operator]
gyfora merged PR #851: URL: https://github.com/apache/flink-kubernetes-operator/pull/851 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688087051 ## flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java: ## @@ -90,8 +100,8 @@ public class JobSpec implements Diffable { /** * Nonce used to trigger a full redeployment of the job from the savepoint path specified in - * initialSavepointPath. In order to trigger redeployment, change the number to a different - * non-null value. Rollback is not possible after redeployment. + * initialSavepointPath or initialSavepointName. In order to trigger redeployment, change the Review Comment: Forgot to rename it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-35776) Simplify job observe logic
[ https://issues.apache.org/jira/browse/FLINK-35776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-35776. -- Resolution: Fixed merged to main 15f648ce46537d6a1df3a87cfd653a3f855d0dcd > Simplify job observe logic > -- > > Key: FLINK-35776 > URL: https://issues.apache.org/jira/browse/FLINK-35776 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.10.0 > > > There is a fairly complicated listing / observe logic for jobs currently that > is no longer necessary as we have a stable logic to always record the jobID > in the status before submission. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32682] Make it possible to use query time based time functions in streaming mode [flink]
dawidwys closed pull request #23083: [FLINK-32682] Make it possible to use query time based time functions in streaming mode URL: https://github.com/apache/flink/pull/23083 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35885) SlicingWindowOperator should not process watermark with proctime
[ https://issues.apache.org/jira/browse/FLINK-35885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Baozhu Zhao updated FLINK-35885: Description: We have discovered an unexpected case where abnormal data with a count of 0 occurs when performing proctime window aggregation on data with a watermark, The SQL is as follows {code:sql} CREATE TABLE s1 ( id INT, event_time TIMESTAMP(3), name string, proc_time AS PROCTIME (), WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND ) WITH ('connector' = 'my-source') ; SELECT * FROM ( SELECT name, COUNT(id) AS total_count, window_start, window_end FROM TABLE ( TUMBLE ( TABLE s1, DESCRIPTOR (proc_time), INTERVAL '30' SECONDS ) ) GROUP BY window_start, window_end, name ) WHERE total_count = 0; {code} For detailed test code, please refer to https://github.com/xingsuo-zbz/flink/blob/zbz/117/proc-agg-window-process-watermark-bug-test/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/bug/WindowBugTest.java was: We have discovered an unexpected case where abnormal data with a count of 0 occurs when performing proctime window aggregation on data with a watermark, The SQL is as follows {code:sql} CREATE TABLE s1 ( id INT, event_time TIMESTAMP(3), name string, proc_time AS PROCTIME (), WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND ) WITH ('connector' = 'my-source') ; SELECT * FROM ( SELECT name, COUNT(id) AS total_count, window_start, window_end FROM TABLE ( TUMBLE ( TABLE s1, DESCRIPTOR (proc_time), INTERVAL '30' SECONDS ) ) GROUP BY window_start, window_end, name ) WHERE total_count = 0; {code} For detailed test code, please refer to xxx Environment: flink 1.13.6 with blink or flink 1.17.2 (was: flink 1.13.6 or flink 1.17.2) > SlicingWindowOperator should not process watermark with proctime > > > Key: FLINK-35885 > URL: https://issues.apache.org/jira/browse/FLINK-35885 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.13.6, 1.17.2 > Environment: flink 1.13.6 with blink or flink 1.17.2 >Reporter: Baozhu Zhao >Priority: Major > > We have discovered an unexpected case where abnormal data with a count of 0 > occurs when performing proctime window aggregation on data with a watermark, > The SQL is as follows > {code:sql} > CREATE TABLE s1 ( > id INT, > event_time TIMESTAMP(3), > name string, > proc_time AS PROCTIME (), > WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND > ) > WITH > ('connector' = 'my-source') > ; > SELECT > * > FROM > ( > SELECT > name, > COUNT(id) AS total_count, > window_start, > window_end > FROM > TABLE ( > TUMBLE ( > TABLE s1, > DESCRIPTOR (proc_time), > INTERVAL '30' SECONDS > ) > ) > GROUP BY > window_start, > window_end, > name > ) > WHERE > total_count = 0; > {code} > For detailed test code, please refer to > https://github.com/xingsuo-zbz/flink/blob/zbz/117/proc-agg-window-process-watermark-bug-test/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/bug/WindowBugTest.java -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688100790 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java: ## @@ -426,6 +438,14 @@ public static String operatorConfigKey(String key) { .withDescription( "Max allowed checkpoint age for initiating last-state upgrades on running jobs. If a checkpoint is not available within the desired age (and nothing in progress) a savepoint will be triggered."); +@Documentation.Section(SECTION_DYNAMIC) +public static final ConfigOption OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE = +operatorConfig("savepoint.dispose-on-delete") +.booleanType() +.defaultValue(false) +.withDescription( +"Savepoint data for FlinkStateSnapshot resources created by the operator during upgrades and periodic savepoints will be disposed of automatically when the generated Kubernetes resource is deleted."); Review Comment: I believe it's the correct way to say it (to dispose of something) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688104509 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotContext.java: ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.controller; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; +import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; +import org.apache.flink.kubernetes.operator.api.spec.JobKind; +import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; + +import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +import java.util.Optional; + +/** Context for reconciling a snapshot. */ +@Getter +@RequiredArgsConstructor +public class FlinkStateSnapshotContext { + +private final FlinkStateSnapshot resource; +private final FlinkStateSnapshotStatus originalStatus; +private final Context josdkContext; +private final FlinkConfigManager configManager; + +private FlinkOperatorConfiguration operatorConfig; +private Configuration referencedJobObserveConfig; +private FlinkDeployment referencedJobFlinkDeployment; + +/** + * @return Operator configuration for this resource. + */ +public FlinkOperatorConfiguration getOperatorConfig() { +if (operatorConfig != null) { +return operatorConfig; +} +return operatorConfig = +configManager.getOperatorConfiguration( +getResource().getMetadata().getNamespace(), null); Review Comment: I have flipped the ifs for now, I will come back to check using Lombok lazy getter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35292] Set dummy savepoint path during last-state upgrade [flink-kubernetes-operator]
gyfora commented on code in PR #849: URL: https://github.com/apache/flink-kubernetes-operator/pull/849#discussion_r1688103240 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java: ## @@ -265,7 +274,7 @@ protected void restoreJob( throws Exception { Optional savepointOpt = Optional.empty(); -if (spec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) { +if (spec.getJob().getUpgradeMode() == UpgradeMode.SAVEPOINT) { Review Comment: The problem is that this cannot be easily done as what savepoint is passed there will depend on where / why the deploy is called. It is for example different during initial deployments vs upgrades. While the optional parameter is not great I would like to avoid adding more complexity to the deploy method. It may make sense to actually break up the deploy method in 3 parts, stateless/last-state/savepoint which would get rid of the optional params + the requireHaMetadata flag at the same time at the expense of more methods. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-35292) Set dummy savepoint path during last-state upgrade
[ https://issues.apache.org/jira/browse/FLINK-35292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-35292. -- Resolution: Fixed merged to main 207b149f9b556d68c4ab98d16cdde2f7820659c0 > Set dummy savepoint path during last-state upgrade > -- > > Key: FLINK-35292 > URL: https://issues.apache.org/jira/browse/FLINK-35292 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Major > Labels: pull-request-available > > Currently the operator always sets the savepoint path even if last-state (HA > metadata) must be used. > This can be misleading to users as the set savepoint path normally should > never take effect and can actually lead to incorrect state restored if the HA > metadata is deleted by the user at the wrong moment. > To avoid this we can set an explicit dummy savepoint path which will prevent > restoring from it accidentally. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688126716 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java: ## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.controller; + +import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; +import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus; +import org.apache.flink.kubernetes.operator.observer.snapshot.StateSnapshotObserver; +import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.kubernetes.operator.reconciler.snapshot.StateSnapshotReconciler; +import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory; +import org.apache.flink.kubernetes.operator.utils.EventRecorder; +import org.apache.flink.kubernetes.operator.utils.EventSourceUtils; +import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils; +import org.apache.flink.kubernetes.operator.utils.StatusRecorder; +import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator; + +import io.javaoperatorsdk.operator.api.reconciler.Cleaner; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; +import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler; +import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** Controller that runs the main reconcile loop for {@link FlinkStateSnapshot}. */ +@ControllerConfiguration +public class FlinkStateSnapshotController +implements Reconciler, +ErrorStatusHandler, +EventSourceInitializer, +Cleaner { + +private static final Logger LOG = LoggerFactory.getLogger(FlinkStateSnapshotController.class); + +private final Set validators; +private final FlinkResourceContextFactory ctxFactory; +private final StateSnapshotReconciler reconciler; +private final StateSnapshotObserver observer; +private final EventRecorder eventRecorder; +private final StatusRecorder statusRecorder; + +public FlinkStateSnapshotController( +Set validators, +FlinkResourceContextFactory ctxFactory, +StateSnapshotReconciler reconciler, +StateSnapshotObserver observer, +EventRecorder eventRecorder, +StatusRecorder statusRecorder) { +this.validators = validators; +this.ctxFactory = ctxFactory; +this.reconciler = reconciler; +this.observer = observer; +this.eventRecorder = eventRecorder; +this.statusRecorder = statusRecorder; +} + +@Override +public UpdateControl reconcile( +FlinkStateSnapshot flinkStateSnapshot, Context josdkContext) { +// status might be null here +flinkStateSnapshot.setStatus( +Objects.requireNonNullElseGet( +flinkStateSnapshot.getStatus(), FlinkStateSnapshotStatus::new)); +var ctx = ctxFactory.getFlinkStateSnapshotContext(flinkStateSnapshot, josdkContext); + +observer.observe(ctx); + +if (validateSnapshot(ctx)) { +reconciler.reconcile(ctx); +} + +notifyListeners(ctx); +return getUpdateControl(ctx); +} + +@Override +public DeleteControl cleanup( +FlinkStateSnapshot flinkStateSnapshot, Context josdkContext) { +var ctx
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688136504 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java: ## @@ -273,15 +280,19 @@ private void disposeSavepointQuietly( } } -private void observeLatestSavepoint( +private void observeLatestCheckpoint( Review Comment: Yep, this will be the latest checkpoint retrieved via Flink REST API, I have updated the log -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35857] fix redeploy failed deployment without latest checkpoint [flink-kubernetes-operator]
chenyuzhi459 commented on code in PR #855: URL: https://github.com/apache/flink-kubernetes-operator/pull/855#discussion_r1688138227 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java: ## @@ -289,6 +289,8 @@ public boolean reconcileOtherChanges(FlinkResourceContext ctx) throws Except LOG.info("Stopping failed Flink job..."); cleanupAfterFailedJob(ctx); status.setError(null); +ReconciliationUtils.updateStatusForDeployedSpec( +ctx.getResource(), ctx.getDeployConfig(ctx.getResource().getSpec()), clock); Review Comment: Assume a flink deployment is submitted to the flink-kubernetes-operator for the first time with the following settings ``` spec.job.upgradeMode=savepoint spec.job.initialSavepointPath=null spec.flinkConfiguration.execution.checkpointing.interval=60s ``` Then I will share the startup and failover process of flink-kubernetes-operator based on my understanding: 1. At the first reconcile, in method [AbstractFlinkResourceReconciler.updateStatusBeforeFirstDeployment](https://github.com/apache/flink-kubernetes-operator/blob/29076c80eaac5547e3d12f703c43780cd4a52dad/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L224) , the `spec.job.upgradeMode` of cloned deployment will be set to STATELESS (this will not be updated synchronously to origin deployment in k8s, which means that the origin deployment's spec.job.upgradeMode is still savepoint) because `spec.job.initialSavepointPath` is empty, and will be serialized into `status.reconciliationStatus.lastReconciledSpec` (this step will be synchronously updated to the origin deployment in k8s, I haven't studied why yet will happen) 2. After running for a period of time, the deployment may encounters a problem and exit with failed status. The operator will save the latest checkpoint in the `status.jobStatus.savepointInfo.lastSavepoint` of the origin deployment in the method `SnapshotObserver.observeSavepointStatus`. 3. Then in the method [AbstractJobReconciler.resubmitJob](https://github.com/apache/flink-kubernetes-operator/blob/29076c80eaac5547e3d12f703c43780cd4a52dad/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java#L315), the lastReconciledSpec of the origin deployment will be read as specToRecover variable and passed to the method [AbstractJobReconciler.restore](https://github.com/apache/flink-kubernetes-operator/blob/29076c80eaac5547e3d12f703c43780cd4a52dad/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java#L260). In the method `AbstractJobReconciler.restore`, it will determine whether to recover from lastSavepoint based on whether the `spec.job.upgradeMode` of specToRecover variable is STATELESS . Before fixed, the updateMode here is obviously STATELESS. Therefore, in the faiover scenes, I think just serializing the origin deployment's `spec.job.upgradeMode=SAVEPOINT` to ` status.reconciliationStatus.lastReconciledSpec` before resubmitJob can solve this problem. I don’t know if there is something wrong with my understanding. If so, I hope you can correct me. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35857] fix redeploy failed deployment without latest checkpoint [flink-kubernetes-operator]
gyfora commented on code in PR #855: URL: https://github.com/apache/flink-kubernetes-operator/pull/855#discussion_r1688196270 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java: ## @@ -289,6 +289,8 @@ public boolean reconcileOtherChanges(FlinkResourceContext ctx) throws Except LOG.info("Stopping failed Flink job..."); cleanupAfterFailedJob(ctx); status.setError(null); +ReconciliationUtils.updateStatusForDeployedSpec( +ctx.getResource(), ctx.getDeployConfig(ctx.getResource().getSpec()), clock); Review Comment: Hm I see, I think you are right. I think the more straightforward and simpler fix would be to actually fix the `resubmitJob` method. In case where we have a savepoint it should set the upgradeMode to savepoint on the spec to recover. Otherwise it can leave it stateless. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column [flink-cdc]
yuxiqian commented on PR #3285: URL: https://github.com/apache/flink-cdc/pull/3285#issuecomment-2245470485 Done, rebased due to some conflicts with https://github.com/apache/flink-cdc/commit/26ff6d2a081181f3df7aa49d65d804c57c634122. Will add `CAST ... AS` tests after #3357 got merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688221106 ## flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkStateSnapshotState.java: ## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.api.status; + +import org.apache.flink.annotation.Experimental; + +/** Describes current snapshot state. */ +@Experimental +public enum FlinkStateSnapshotState { Review Comment: Good idea, thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35285) Autoscaler key group optimization can interfere with scale-down.max-factor
[ https://issues.apache.org/jira/browse/FLINK-35285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868093#comment-17868093 ] Trystan commented on FLINK-35285: - {noformat} As long as your job parallelism is very small compared to the max parallelism and we have a lot of divisors the algorithm has a lot of flexibility even with small scale factors. {noformat} Yes, I agree this makes sense. Pairing it with vertex max and a high overall max-parallelism could essentially trick the current algo into working. I would argue that a current parallelism 40 is not very close to the max parallelism of 120, though. Maybe our patterns are outside the norm? But to me this seems well within a "normal" range. Is there any reason why we wouldn't want to adjust the algorithm? To my eyes, it has a flaw in that when a scale is _requested_ it may not _actually_ scale because it does not take into account the current bounds, i.e. {noformat} On scale down, ensure that p < currentParallelism and on scale up p > currentParallelism.{noformat} Without such a check, it is very likely that the loop in question will find p == currentParallelism and then maxParallelism % p == 0 will return true, resulting in no action being taken. Looking at the goals of the algorithm, it seems designed to _try its best_ to find a p such that [max % p == 0|https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296-L303], but if it fails it should still return p ([here|[https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296-L303]).] I think a simple check ensuring that p != currentParallelism could let it optimize without deadlocking. Or perhaps I'm misunderstanding the goal. I would be happy to send a PR over with a slightly tweaked algorithm if you're open to adjusting this slightly. > Autoscaler key group optimization can interfere with scale-down.max-factor > -- > > Key: FLINK-35285 > URL: https://issues.apache.org/jira/browse/FLINK-35285 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Trystan >Priority: Minor > > When setting a less aggressive scale down limit, the key group optimization > can prevent a vertex from scaling down at all. It will hunt from target > upwards to maxParallelism/2, and will always find currentParallelism again. > > A simple test trying to scale down from a parallelism of 60 with a > scale-down.max-factor of 0.2: > {code:java} > assertEquals(48, JobVertexScaler.scale(60, inputShipStrategies, 360, .8, 8, > 360)); {code} > > It seems reasonable to make a good attempt to spread data across subtasks, > but not at the expense of total deadlock. The problem is that during scale > down it doesn't actually ensure that newParallelism will be < > currentParallelism. The only workaround is to set a scale down factor large > enough such that it finds the next lowest divisor of the maxParallelism. > > Clunky, but something to ensure it can make at least some progress. There is > another test that now fails, but just to illustrate the point: > {code:java} > for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) > { > if ((scaleFactor < 1 && p < currentParallelism) || (scaleFactor > 1 && p > > currentParallelism)) { > if (maxParallelism % p == 0) { > return p; > } > } > } {code} > > Perhaps this is by design and not a bug, but total failure to scale down in > order to keep optimized key groups does not seem ideal. > > Key group optimization block: > [https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296C1-L303C10] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35285) Autoscaler key group optimization can interfere with scale-down.max-factor
[ https://issues.apache.org/jira/browse/FLINK-35285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868093#comment-17868093 ] Trystan edited comment on FLINK-35285 at 7/23/24 3:00 PM: -- {noformat} As long as your job parallelism is very small compared to the max parallelism and we have a lot of divisors the algorithm has a lot of flexibility even with small scale factors. {noformat} Yes, I agree this makes sense. Pairing it with vertex max and a high overall max-parallelism could essentially trick the current algo into working. I would argue that a current parallelism 40 is not very close to the max parallelism of 120, though. Maybe our patterns are outside the norm? But to me this seems well within a "normal" range. Is there any reason why we wouldn't want to adjust the algorithm? To my eyes, it has a flaw in that when a scale is _requested_ it may not _actually_ scale because it does not take into account the current bounds, i.e. {noformat} On scale down, ensure that p < currentParallelism and on scale up p > currentParallelism.{noformat} Without such a check, it is very likely that the loop in question will find p == currentParallelism and then maxParallelism % p == 0 will return true, resulting in no action being taken. Looking at the goals of the algorithm, it seems designed to _try its best_ to find a p such that [max % p == 0|https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296-L303], but if it fails it should still return p ([here|#L296-L303]).]) I think a simple check ensuring that p != currentParallelism could let it optimize without deadlocking. Or perhaps I'm misunderstanding the goal. I would be happy to send a PR over with a slightly tweaked algorithm if you're open to adjusting this slightly. was (Author: trystan): {noformat} As long as your job parallelism is very small compared to the max parallelism and we have a lot of divisors the algorithm has a lot of flexibility even with small scale factors. {noformat} Yes, I agree this makes sense. Pairing it with vertex max and a high overall max-parallelism could essentially trick the current algo into working. I would argue that a current parallelism 40 is not very close to the max parallelism of 120, though. Maybe our patterns are outside the norm? But to me this seems well within a "normal" range. Is there any reason why we wouldn't want to adjust the algorithm? To my eyes, it has a flaw in that when a scale is _requested_ it may not _actually_ scale because it does not take into account the current bounds, i.e. {noformat} On scale down, ensure that p < currentParallelism and on scale up p > currentParallelism.{noformat} Without such a check, it is very likely that the loop in question will find p == currentParallelism and then maxParallelism % p == 0 will return true, resulting in no action being taken. Looking at the goals of the algorithm, it seems designed to _try its best_ to find a p such that [max % p == 0|https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296-L303], but if it fails it should still return p ([here|[https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296-L303]).] I think a simple check ensuring that p != currentParallelism could let it optimize without deadlocking. Or perhaps I'm misunderstanding the goal. I would be happy to send a PR over with a slightly tweaked algorithm if you're open to adjusting this slightly. > Autoscaler key group optimization can interfere with scale-down.max-factor > -- > > Key: FLINK-35285 > URL: https://issues.apache.org/jira/browse/FLINK-35285 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Trystan >Priority: Minor > > When setting a less aggressive scale down limit, the key group optimization > can prevent a vertex from scaling down at all. It will hunt from target > upwards to maxParallelism/2, and will always find currentParallelism again. > > A simple test trying to scale down from a parallelism of 60 with a > scale-down.max-factor of 0.2: > {code:java} > assertEquals(48, JobVertexScaler.scale(60, inputShipStrategies, 360, .8, 8, > 360)); {code} > > It seems reasonable to make a good attempt to spread data across subtasks, > but not at the expense of total deadlock. The problem is that during scale > down it doesn't actually ensure that newPar
[PR] [FLINK-22748][connector-kafka] Allow dynamic target topic selection in SQL Kafka sinks [flink-connector-kafka]
klam-shop opened a new pull request, #109: URL: https://github.com/apache/flink-connector-kafka/pull/109 ## What is the purpose of the change Allows writing to different Kafka topics based on the `topic` metadata column value in SQL, and updates the Table API's `KafkaDynamicSink` to accept a `List topics` instead of `String topic`. The list acts as a whitelist of acceptable values for the `topic` metadata column: - If a single topic is provided, it is used by default for the target topic to produce to - If a list is provided, only that list of topics can be produced to - If no list is provided, any value is accepted Builds on the work in https://github.com/apache/flink/pull/16142 by @SteNicholas ## Brief change log - Adds `topic` as writable metadata - Updates Table API Kafka sink to accept `List topics` instead of `String topic`, mirroring he source side. - Implements whitelist behaviour in `DynamicKafkaRecordSerializationSchema` ## Verifying this change - [x] Tests that the Sink Factory and related machinery works as expected - [x] Tests the various valid and invalid scenarios for `topic` metadata in `DynamicKafkaRecordSerializationSchema` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: yes - The runtime per-record code paths (performance sensitive): yes - Anything that affects deployment or recovery: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? updated documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-22748][connector-kafka] Allow dynamic target topic selection in SQL Kafka sinks [flink]
klam-shop commented on PR #16142: URL: https://github.com/apache/flink/pull/16142#issuecomment-2245500551 Hi I've taken some time to build upon [Nicholas Jiang](https://issues.apache.org/jira/secure/ViewProfile.jspa?name=nicholasjiang) 's PR and port it to the new kafka connector repo: https://github.com/apache/flink-connector-kafka/pull/109 Would appreciate feedback on this approach, let me know. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-35285) Autoscaler key group optimization can interfere with scale-down.max-factor
[ https://issues.apache.org/jira/browse/FLINK-35285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868093#comment-17868093 ] Trystan edited comment on FLINK-35285 at 7/23/24 3:02 PM: -- {noformat} As long as your job parallelism is very small compared to the max parallelism and we have a lot of divisors the algorithm has a lot of flexibility even with small scale factors. {noformat} Yes, I agree this makes sense. Pairing it with vertex max and a high overall max-parallelism could essentially trick the current algo into working. I would argue that a current parallelism 40 is not very close to the max parallelism of 120, though. Maybe our patterns are outside the norm? But to me this seems well within a "normal" range. Is there any reason why we wouldn't want to adjust the algorithm? To my eyes, it has a flaw in that when a scale is _requested_ it may not _actually_ scale because it does not take into account the current bounds, i.e. {noformat} On scale down, ensure that p < currentParallelism and on scale up p > currentParallelism.{noformat} Without such a check, it is very likely that the loop in question will find p == currentParallelism and then maxParallelism % p == 0 will return true, resulting in no action being taken. Looking at the goals of the algorithm, it seems designed to _try its best_ to find a p such that [max % p == 0|https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296-L303], but if it fails it should still return p ([here|#L296-L303]). I think a simple check ensuring that p != currentParallelism could let it optimize without deadlocking. Or perhaps I'm misunderstanding the goal. I would be happy to send a PR over with a slightly tweaked algorithm if you're open to adjusting this slightly. was (Author: trystan): {noformat} As long as your job parallelism is very small compared to the max parallelism and we have a lot of divisors the algorithm has a lot of flexibility even with small scale factors. {noformat} Yes, I agree this makes sense. Pairing it with vertex max and a high overall max-parallelism could essentially trick the current algo into working. I would argue that a current parallelism 40 is not very close to the max parallelism of 120, though. Maybe our patterns are outside the norm? But to me this seems well within a "normal" range. Is there any reason why we wouldn't want to adjust the algorithm? To my eyes, it has a flaw in that when a scale is _requested_ it may not _actually_ scale because it does not take into account the current bounds, i.e. {noformat} On scale down, ensure that p < currentParallelism and on scale up p > currentParallelism.{noformat} Without such a check, it is very likely that the loop in question will find p == currentParallelism and then maxParallelism % p == 0 will return true, resulting in no action being taken. Looking at the goals of the algorithm, it seems designed to _try its best_ to find a p such that [max % p == 0|https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296-L303], but if it fails it should still return p ([here|#L296-L303]).]) I think a simple check ensuring that p != currentParallelism could let it optimize without deadlocking. Or perhaps I'm misunderstanding the goal. I would be happy to send a PR over with a slightly tweaked algorithm if you're open to adjusting this slightly. > Autoscaler key group optimization can interfere with scale-down.max-factor > -- > > Key: FLINK-35285 > URL: https://issues.apache.org/jira/browse/FLINK-35285 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Trystan >Priority: Minor > > When setting a less aggressive scale down limit, the key group optimization > can prevent a vertex from scaling down at all. It will hunt from target > upwards to maxParallelism/2, and will always find currentParallelism again. > > A simple test trying to scale down from a parallelism of 60 with a > scale-down.max-factor of 0.2: > {code:java} > assertEquals(48, JobVertexScaler.scale(60, inputShipStrategies, 360, .8, 8, > 360)); {code} > > It seems reasonable to make a good attempt to spread data across subtasks, > but not at the expense of total deadlock. The problem is that during scale > down it doesn't actually ensure that newParallelism will be < > currentParallelism. The only workaround is to set a scale down factor large > enough such that it finds the next lowest divisor of the maxParallelism. > > Clunky, but
[jira] [Comment Edited] (FLINK-35285) Autoscaler key group optimization can interfere with scale-down.max-factor
[ https://issues.apache.org/jira/browse/FLINK-35285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868093#comment-17868093 ] Trystan edited comment on FLINK-35285 at 7/23/24 3:03 PM: -- {noformat} As long as your job parallelism is very small compared to the max parallelism and we have a lot of divisors the algorithm has a lot of flexibility even with small scale factors. {noformat} Yes, I agree this makes sense. Pairing it with vertex max and a high overall max-parallelism could essentially trick the current algo into working. I would argue that a current parallelism 40 is not very close to the max parallelism of 120, though. Maybe our patterns are outside the norm? But to me this seems well within a "normal" range. Is there any reason why we wouldn't want to adjust the algorithm? To my eyes, it has a flaw in that when a scale is _requested_ it may not _actually_ scale because it does not take into account the current bounds, i.e. {noformat} On scale down, ensure that p < currentParallelism and on scale up p > currentParallelism.{noformat} Without such a check, it is very likely that the loop in question will find p == currentParallelism and then maxParallelism % p == 0 will return true, resulting in no action being taken. Looking at the goals of the algorithm, it seems designed to _try its best_ to find a p such that [max % p == 0|https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296-L303], but if it fails it should still return p ([here|https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L305-L306]). I think a simple check ensuring that p != currentParallelism could let it optimize without deadlocking. Or perhaps I'm misunderstanding the goal. I would be happy to send a PR over with a slightly tweaked algorithm if you're open to adjusting this slightly. was (Author: trystan): {noformat} As long as your job parallelism is very small compared to the max parallelism and we have a lot of divisors the algorithm has a lot of flexibility even with small scale factors. {noformat} Yes, I agree this makes sense. Pairing it with vertex max and a high overall max-parallelism could essentially trick the current algo into working. I would argue that a current parallelism 40 is not very close to the max parallelism of 120, though. Maybe our patterns are outside the norm? But to me this seems well within a "normal" range. Is there any reason why we wouldn't want to adjust the algorithm? To my eyes, it has a flaw in that when a scale is _requested_ it may not _actually_ scale because it does not take into account the current bounds, i.e. {noformat} On scale down, ensure that p < currentParallelism and on scale up p > currentParallelism.{noformat} Without such a check, it is very likely that the loop in question will find p == currentParallelism and then maxParallelism % p == 0 will return true, resulting in no action being taken. Looking at the goals of the algorithm, it seems designed to _try its best_ to find a p such that [max % p == 0|https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296-L303], but if it fails it should still return p ([here|#L296-L303]). I think a simple check ensuring that p != currentParallelism could let it optimize without deadlocking. Or perhaps I'm misunderstanding the goal. I would be happy to send a PR over with a slightly tweaked algorithm if you're open to adjusting this slightly. > Autoscaler key group optimization can interfere with scale-down.max-factor > -- > > Key: FLINK-35285 > URL: https://issues.apache.org/jira/browse/FLINK-35285 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Trystan >Priority: Minor > > When setting a less aggressive scale down limit, the key group optimization > can prevent a vertex from scaling down at all. It will hunt from target > upwards to maxParallelism/2, and will always find currentParallelism again. > > A simple test trying to scale down from a parallelism of 60 with a > scale-down.max-factor of 0.2: > {code:java} > assertEquals(48, JobVertexScaler.scale(60, inputShipStrategies, 360, .8, 8, > 360)); {code} > > It seems reasonable to make a good attempt to spread data across subtasks, > but not at the expense of total deadlock. The problem is that during scale > down it doesn't actually ensure that newParallelism will b
[jira] [Commented] (FLINK-22748) Allow dynamic target topic selection in SQL Kafka sinks
[ https://issues.apache.org/jira/browse/FLINK-22748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868094#comment-17868094 ] Kevin Lam commented on FLINK-22748: --- Hi I've taken some time to build upon [~nicholasjiang] 's PR and port it to the new kafka connector repo: [https://github.com/apache/flink-connector-kafka/pull/109] Would appreciate feedback on this approach, let me know. > Allow dynamic target topic selection in SQL Kafka sinks > --- > > Key: FLINK-22748 > URL: https://issues.apache.org/jira/browse/FLINK-22748 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Table SQL / Ecosystem >Reporter: Timo Walther >Assignee: Nicholas Jiang >Priority: Major > Labels: pull-request-available, stale-assigned, starter > > We should allow to write to different Kafka topics based on some column value > in SQL. > The existing implementation can be easily adapted for that. The "target > topic" would be an additional persisted metadata column in SQL terms. All one > need to do is to adapt > DynamicKafkaSerializationSchema > KafkaDynamicSink > We should guard this dynamic behavior via a config option and make the topic > option optional in this case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35285) Autoscaler key group optimization can interfere with scale-down.max-factor
[ https://issues.apache.org/jira/browse/FLINK-35285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868093#comment-17868093 ] Trystan edited comment on FLINK-35285 at 7/23/24 3:03 PM: -- {noformat} As long as your job parallelism is very small compared to the max parallelism and we have a lot of divisors the algorithm has a lot of flexibility even with small scale factors. {noformat} Yes, I agree this makes sense. Pairing it with vertex max and a high overall max-parallelism could essentially trick the current algo into working. I would argue that a current parallelism 40 is not very close to the max parallelism of 120, though. Maybe our patterns are outside the norm? But to me this seems well within a "normal" range. Is there any reason why we wouldn't want to adjust the algorithm? To my eyes, it has a flaw in that when a scale is _requested_ it may not _actually_ scale because it does not take into account the current bounds, i.e. {noformat} On scale down, ensure that p < currentParallelism and on scale up p > currentParallelism.{noformat} Without such a check, it is very likely that the loop in question will find p == currentParallelism and then maxParallelism % p == 0 will return true, resulting in no action being taken. Looking at the goals of the algorithm, it seems designed to _try its best_ to find a p such that [max % p == 0|https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296-L303], but if it fails it should still return p ([here|https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L305-L306]). I think a simple check ensuring that p != currentParallelism in the keygroup optimization loop could let it optimize without deadlocking. Or perhaps I'm misunderstanding the goal. I would be happy to send a PR over with a slightly tweaked algorithm if you're open to adjusting this slightly. was (Author: trystan): {noformat} As long as your job parallelism is very small compared to the max parallelism and we have a lot of divisors the algorithm has a lot of flexibility even with small scale factors. {noformat} Yes, I agree this makes sense. Pairing it with vertex max and a high overall max-parallelism could essentially trick the current algo into working. I would argue that a current parallelism 40 is not very close to the max parallelism of 120, though. Maybe our patterns are outside the norm? But to me this seems well within a "normal" range. Is there any reason why we wouldn't want to adjust the algorithm? To my eyes, it has a flaw in that when a scale is _requested_ it may not _actually_ scale because it does not take into account the current bounds, i.e. {noformat} On scale down, ensure that p < currentParallelism and on scale up p > currentParallelism.{noformat} Without such a check, it is very likely that the loop in question will find p == currentParallelism and then maxParallelism % p == 0 will return true, resulting in no action being taken. Looking at the goals of the algorithm, it seems designed to _try its best_ to find a p such that [max % p == 0|https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296-L303], but if it fails it should still return p ([here|https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L305-L306]). I think a simple check ensuring that p != currentParallelism could let it optimize without deadlocking. Or perhaps I'm misunderstanding the goal. I would be happy to send a PR over with a slightly tweaked algorithm if you're open to adjusting this slightly. > Autoscaler key group optimization can interfere with scale-down.max-factor > -- > > Key: FLINK-35285 > URL: https://issues.apache.org/jira/browse/FLINK-35285 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Trystan >Priority: Minor > > When setting a less aggressive scale down limit, the key group optimization > can prevent a vertex from scaling down at all. It will hunt from target > upwards to maxParallelism/2, and will always find currentParallelism again. > > A simple test trying to scale down from a parallelism of 60 with a > scale-down.max-factor of 0.2: > {code:java} > assertEquals(48, JobVertexScaler.scale(60, inputShipStrategies, 360, .8, 8, > 360)); {code} > > It
Re: [PR] [FLINK-22748][connector-kafka] Allow dynamic target topic selection in SQL Kafka sinks [flink-connector-kafka]
klam-shop commented on code in PR #109: URL: https://github.com/apache/flink-connector-kafka/pull/109#discussion_r1688236979 ## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java: ## @@ -144,14 +147,34 @@ public void open( valueSerialization.open(context); } +private String getTargetTopic(RowData element) { +final String topic = readMetadata(element, KafkaDynamicSink.WritableMetadata.TOPIC); +if (topic == null && topics == null) { +throw new IllegalArgumentException( +"The topic of the sink record is not valid. Expected a single topic but no topic is set."); +} else if (topic == null && topics.size() == 1) { +return topics.get(0); +} else if (topics != null && topics.size() > 0 && !topics.contains(topic)) { Review Comment: Open to discuss: I decided to keep topics as a List since, but we can change it to HashSet. I am assuming the list will be short on average. List can outperform HashSet for small number of elements: https://stackoverflow.com/questions/150750/hashset-vs-list-performance -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688262282 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotContext.java: ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.controller; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; +import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; +import org.apache.flink.kubernetes.operator.api.spec.JobKind; +import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; + +import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +import java.util.Optional; + +/** Context for reconciling a snapshot. */ +@Getter +@RequiredArgsConstructor +public class FlinkStateSnapshotContext { + +private final FlinkStateSnapshot resource; +private final FlinkStateSnapshotStatus originalStatus; +private final Context josdkContext; +private final FlinkConfigManager configManager; + +private FlinkOperatorConfiguration operatorConfig; +private Configuration referencedJobObserveConfig; +private FlinkDeployment referencedJobFlinkDeployment; + +/** + * @return Operator configuration for this resource. + */ +public FlinkOperatorConfiguration getOperatorConfig() { +if (operatorConfig != null) { +return operatorConfig; +} +return operatorConfig = +configManager.getOperatorConfiguration( +getResource().getMetadata().getNamespace(), null); Review Comment: Added lazy getter as well, thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35886) Incorrect watermark idleness timeout accounting when subtask is backpressured/blocked
Piotr Nowojski created FLINK-35886: -- Summary: Incorrect watermark idleness timeout accounting when subtask is backpressured/blocked Key: FLINK-35886 URL: https://issues.apache.org/jira/browse/FLINK-35886 Project: Flink Issue Type: Bug Components: API / DataStream, Runtime / Task Affects Versions: 1.19.1, 1.18.1, 1.20.0 Reporter: Piotr Nowojski Currently when using watermark with idleness in Flink, idleness can be incorrectly detected when reading records from a source that is blocked by the runtime. For example this can easily happen when source is either backpressured, or blocked by the watermark alignment. In those cases, despite there are more records to be read from the source (or source’s split), runtime is deciding not to poll (or being unable to) those records. In such case idleness timeout can kick in, marking source/source split as idle, which can lead to incorrect combined watermark calculations and dropping of incorrectly marked late records. h4. Watermark alignment If there are two source splits, A and B , and maxAllowedWatermarkDrift is set to 30s. # Partition A emitted watermark 1042 sec, while partition B sits at watermark 1000 sec. # {{1042s - 1000s > maxAllowedWatermarkDrift}}, so partition A is blocked by the watermark alignment. # For the duration of idleTimeout, partition B is emitting some large batch of records, that do not advance watermark of that partition by much. For example either watermark for partition B stays 1000s, or is updated by a small amount to for example 1005s. # idleTimeout kicks in, marking partition A as idle # partition B finishes emitting large batch of those older records, and let's say now there is a gap in rowtimes. Previously partition B was emitting records with rowtime ~1000s, now it jumps to for example ~5000s. # As partition A is idle, combined watermark jumps to ~5000s as well. # Watermark alignment unblocks partition A, and it continues emitting records with rowtime ~1042s. But now all of those records are dropped due to being late. h4. Backpressure When there are two SourceOperator’s, A and B. Due to for example some data skew, it could happen that either only A gets backpressured, or A is backpressured quicker/sooner. Either way, during that time when A is backpressured, while B is not, B can bump the combined watermark high enough, so that when backpressure recedes, fresh records from A will be considered as late, leading to incorrect results. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-35886) Incorrect watermark idleness timeout accounting when subtask is backpressured/blocked
[ https://issues.apache.org/jira/browse/FLINK-35886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski reassigned FLINK-35886: -- Assignee: Piotr Nowojski > Incorrect watermark idleness timeout accounting when subtask is > backpressured/blocked > - > > Key: FLINK-35886 > URL: https://issues.apache.org/jira/browse/FLINK-35886 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Runtime / Task >Affects Versions: 1.18.1, 1.20.0, 1.19.1 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Critical > > Currently when using watermark with idleness in Flink, idleness can be > incorrectly detected when reading records from a source that is blocked by > the runtime. For example this can easily happen when source is either > backpressured, or blocked by the watermark alignment. In those cases, despite > there are more records to be read from the source (or source’s split), > runtime is deciding not to poll (or being unable to) those records. In such > case idleness timeout can kick in, marking source/source split as idle, which > can lead to incorrect combined watermark calculations and dropping of > incorrectly marked late records. > h4. Watermark alignment > If there are two source splits, A and B , and maxAllowedWatermarkDrift is set > to 30s. > # Partition A emitted watermark 1042 sec, while partition B sits at watermark > 1000 sec. > # {{1042s - 1000s > maxAllowedWatermarkDrift}}, so partition A is blocked by > the watermark alignment. > # For the duration of idleTimeout, partition B is emitting some large batch > of records, that do not advance watermark of that partition by much. For > example either watermark for partition B stays 1000s, or is updated by a > small amount to for example 1005s. > # idleTimeout kicks in, marking partition A as idle > # partition B finishes emitting large batch of those older records, and let's > say now there is a gap in rowtimes. Previously partition B was emitting > records with rowtime ~1000s, now it jumps to for example ~5000s. > # As partition A is idle, combined watermark jumps to ~5000s as well. > # Watermark alignment unblocks partition A, and it continues emitting records > with rowtime ~1042s. But now all of those records are dropped due to being > late. > h4. Backpressure > When there are two SourceOperator’s, A and B. Due to for example some data > skew, it could happen that either only A gets backpressured, or A is > backpressured quicker/sooner. Either way, during that time when A is > backpressured, while B is not, B can bump the combined watermark high enough, > so that when backpressure recedes, fresh records from A will be considered as > late, leading to incorrect results. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35886) Incorrect watermark idleness timeout accounting when subtask is backpressured/blocked
[ https://issues.apache.org/jira/browse/FLINK-35886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868098#comment-17868098 ] Piotr Nowojski commented on FLINK-35886: I think this problem requires a public API change, so I will publish a FLIP shortly. > Incorrect watermark idleness timeout accounting when subtask is > backpressured/blocked > - > > Key: FLINK-35886 > URL: https://issues.apache.org/jira/browse/FLINK-35886 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Runtime / Task >Affects Versions: 1.18.1, 1.20.0, 1.19.1 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Critical > > Currently when using watermark with idleness in Flink, idleness can be > incorrectly detected when reading records from a source that is blocked by > the runtime. For example this can easily happen when source is either > backpressured, or blocked by the watermark alignment. In those cases, despite > there are more records to be read from the source (or source’s split), > runtime is deciding not to poll (or being unable to) those records. In such > case idleness timeout can kick in, marking source/source split as idle, which > can lead to incorrect combined watermark calculations and dropping of > incorrectly marked late records. > h4. Watermark alignment > If there are two source splits, A and B , and maxAllowedWatermarkDrift is set > to 30s. > # Partition A emitted watermark 1042 sec, while partition B sits at watermark > 1000 sec. > # {{1042s - 1000s > maxAllowedWatermarkDrift}}, so partition A is blocked by > the watermark alignment. > # For the duration of idleTimeout, partition B is emitting some large batch > of records, that do not advance watermark of that partition by much. For > example either watermark for partition B stays 1000s, or is updated by a > small amount to for example 1005s. > # idleTimeout kicks in, marking partition A as idle > # partition B finishes emitting large batch of those older records, and let's > say now there is a gap in rowtimes. Previously partition B was emitting > records with rowtime ~1000s, now it jumps to for example ~5000s. > # As partition A is idle, combined watermark jumps to ~5000s as well. > # Watermark alignment unblocks partition A, and it continues emitting records > with rowtime ~1042s. But now all of those records are dropped due to being > late. > h4. Backpressure > When there are two SourceOperator’s, A and B. Due to for example some data > skew, it could happen that either only A gets backpressured, or A is > backpressured quicker/sooner. Either way, during that time when A is > backpressured, while B is not, B can bump the combined watermark high enough, > so that when backpressure recedes, fresh records from A will be considered as > late, leading to incorrect results. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column [flink-cdc]
yuxiqian commented on code in PR #3285: URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1688271544 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java: ## Review Comment: I modified expression evaluation logic here since previous version could not correctly handle expressions like `filter: a > 1 and a < 2` and generates duplicated argument names which would crash Janino compiler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column [flink-cdc]
yuxiqian commented on code in PR #3285: URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1688271544 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java: ## Review Comment: I modified expression evaluation logic here since previous version could not correctly handle expressions like `filter: a > 1 and a < 2` and generates duplicated argument names which would crash the operator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1688281873 ## flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilderTest.java: ## @@ -25,14 +25,6 @@ /** Covers construction, defaults and sanity checking of {@link SqsSinkBuilder}. */ class SqsSinkBuilderTest { -@Test -void elementConverterOfSinkMustBeSetWhenBuilt() { -Assertions.assertThatExceptionOfType(NullPointerException.class) -.isThrownBy(() -> SqsSink.builder().setSqsUrl("sqlUrl").build()) -.withMessageContaining( -"No SerializationSchema was supplied to the SQS Sink builder."); -} - Review Comment: because we added a default SerializationSchema if it is not provided by the customer so we never end up throwing this error -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-35886) Incorrect watermark idleness timeout accounting when subtask is backpressured/blocked
[ https://issues.apache.org/jira/browse/FLINK-35886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868098#comment-17868098 ] Piotr Nowojski edited comment on FLINK-35886 at 7/23/24 3:45 PM: - I think this problem requires a public API change, so I have published a FLIP for this: https://cwiki.apache.org/confluence/display/FLINK/FLIP-471%3A++Fixing+watermark+idleness+timeout+accounting was (Author: pnowojski): I think this problem requires a public API change, so I will publish a FLIP shortly. > Incorrect watermark idleness timeout accounting when subtask is > backpressured/blocked > - > > Key: FLINK-35886 > URL: https://issues.apache.org/jira/browse/FLINK-35886 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Runtime / Task >Affects Versions: 1.18.1, 1.20.0, 1.19.1 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Critical > > Currently when using watermark with idleness in Flink, idleness can be > incorrectly detected when reading records from a source that is blocked by > the runtime. For example this can easily happen when source is either > backpressured, or blocked by the watermark alignment. In those cases, despite > there are more records to be read from the source (or source’s split), > runtime is deciding not to poll (or being unable to) those records. In such > case idleness timeout can kick in, marking source/source split as idle, which > can lead to incorrect combined watermark calculations and dropping of > incorrectly marked late records. > h4. Watermark alignment > If there are two source splits, A and B , and maxAllowedWatermarkDrift is set > to 30s. > # Partition A emitted watermark 1042 sec, while partition B sits at watermark > 1000 sec. > # {{1042s - 1000s > maxAllowedWatermarkDrift}}, so partition A is blocked by > the watermark alignment. > # For the duration of idleTimeout, partition B is emitting some large batch > of records, that do not advance watermark of that partition by much. For > example either watermark for partition B stays 1000s, or is updated by a > small amount to for example 1005s. > # idleTimeout kicks in, marking partition A as idle > # partition B finishes emitting large batch of those older records, and let's > say now there is a gap in rowtimes. Previously partition B was emitting > records with rowtime ~1000s, now it jumps to for example ~5000s. > # As partition A is idle, combined watermark jumps to ~5000s as well. > # Watermark alignment unblocks partition A, and it continues emitting records > with rowtime ~1042s. But now all of those records are dropped due to being > late. > h4. Backpressure > When there are two SourceOperator’s, A and B. Due to for example some data > skew, it could happen that either only A gets backpressured, or A is > backpressured quicker/sooner. Either way, during that time when A is > backpressured, while B is not, B can bump the combined watermark high enough, > so that when backpressure recedes, fresh records from A will be considered as > late, leading to incorrect results. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35887) Null Pointer Exception in TypeExtractor.isRecord when trying to provide type info for interface
Jacob Jona Fahlenkamp created FLINK-35887: - Summary: Null Pointer Exception in TypeExtractor.isRecord when trying to provide type info for interface Key: FLINK-35887 URL: https://issues.apache.org/jira/browse/FLINK-35887 Project: Flink Issue Type: Bug Components: API / Type Serialization System Affects Versions: 1.19.1 Reporter: Jacob Jona Fahlenkamp The following code {code:java} import org.apache.flink.api.common.typeinfo.TypeInfo; import org.apache.flink.api.common.typeinfo.TypeInfoFactory; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.types.PojoTestUtils; import org.junit.jupiter.api.Test; import java.lang.reflect.Type; import java.util.Map; public class DebugTest { @TypeInfo(FooFactory.class) public interface Foo{} public static class FooFactory extends TypeInfoFactory { @Override public TypeInformation createTypeInfo(Type type, Map> map) { return Types.POJO(Foo.class, Map.of()); } } @Test void test() { PojoTestUtils.assertSerializedAsPojo(Foo.class); } } {code} throws this exception: {code:java} java.lang.NullPointerException: Cannot invoke "java.lang.Class.getName()" because the return value of "java.lang.Class.getSuperclass()" is null at org.apache.flink.api.java.typeutils.TypeExtractor.isRecord(TypeExtractor.java:2227) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.(PojoSerializer.java:125) at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:359) at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:347) at org.apache.flink.types.PojoTestUtils.assertSerializedAsPojo(PojoTestUtils.java:48) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35887) Null Pointer Exception in TypeExtractor.isRecord when trying to provide type info for interface
[ https://issues.apache.org/jira/browse/FLINK-35887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Jona Fahlenkamp updated FLINK-35887: -- Priority: Major (was: Minor) > Null Pointer Exception in TypeExtractor.isRecord when trying to provide type > info for interface > --- > > Key: FLINK-35887 > URL: https://issues.apache.org/jira/browse/FLINK-35887 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.19.1 >Reporter: Jacob Jona Fahlenkamp >Priority: Major > > The following code > {code:java} > import org.apache.flink.api.common.typeinfo.TypeInfo; > import org.apache.flink.api.common.typeinfo.TypeInfoFactory; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.common.typeinfo.Types; > import org.apache.flink.types.PojoTestUtils; > import org.junit.jupiter.api.Test; > import java.lang.reflect.Type; > import java.util.Map; > public class DebugTest { > @TypeInfo(FooFactory.class) > public interface Foo{} > public static class FooFactory extends TypeInfoFactory { >@Override >public TypeInformation createTypeInfo(Type type, Map TypeInformation> map) { > return Types.POJO(Foo.class, Map.of()); >} > } > @Test > void test() { >PojoTestUtils.assertSerializedAsPojo(Foo.class); > } > } {code} > throws this exception: > {code:java} > java.lang.NullPointerException: Cannot invoke "java.lang.Class.getName()" > because the return value of "java.lang.Class.getSuperclass()" is null > at > org.apache.flink.api.java.typeutils.TypeExtractor.isRecord(TypeExtractor.java:2227) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.(PojoSerializer.java:125) >at > org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:359) > at > org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:347) > at > org.apache.flink.types.PojoTestUtils.assertSerializedAsPojo(PojoTestUtils.java:48) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688463075 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/AuditUtils.java: ## @@ -53,6 +62,16 @@ private static String format(@NonNull CommonStatus status) { : status.getError()); } +private static String format(@NonNull FlinkStateSnapshotStatus status) { +if (StringUtils.isEmpty(status.getError())) { +return String.format( +">>> Status[Snapshot] | Info | %s | %s", status.getState(), status.getPath()); Review Comment: I have added some changes for this, thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688472333 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -716,6 +695,48 @@ public CheckpointFetchResult fetchCheckpointInfo( } } +@Override +public CheckpointStatsResult fetchCheckpointStats( +String jobId, Long checkpointId, Configuration conf) { +try (RestClusterClient clusterClient = getClusterClient(conf)) { +var checkpointStatusHeaders = CheckpointStatisticDetailsHeaders.getInstance(); +var parameters = checkpointStatusHeaders.getUnresolvedMessageParameters(); +parameters.jobPathParameter.resolve(JobID.fromHexString(jobId)); + +// This was needed because the parameter is protected +var checkpointIdPathParameter = +(CheckpointIdPathParameter) Iterables.getLast(parameters.getPathParameters()); +checkpointIdPathParameter.resolve(checkpointId); + +var response = +clusterClient.sendRequest( +checkpointStatusHeaders, parameters, EmptyRequestBody.getInstance()); + +var stats = response.get(); +if (stats == null) { +throw new IllegalStateException("Checkpoint ID %d for job %s does not exist!"); +} else if (stats instanceof CheckpointStatistics.CompletedCheckpointStatistics) { +return CheckpointStatsResult.completed( +((CheckpointStatistics.CompletedCheckpointStatistics) stats) +.getExternalPath()); +} else if (stats instanceof CheckpointStatistics.FailedCheckpointStatistics) { +return CheckpointStatsResult.error( +((CheckpointStatistics.FailedCheckpointStatistics) stats) +.getFailureMessage()); +} else if (stats instanceof CheckpointStatistics.PendingCheckpointStatistics) { +return CheckpointStatsResult.pending(); +} else { +throw new IllegalArgumentException( +String.format( +"Unknown checkpoint statistics result class: %s", +stats.getClass().getSimpleName())); +} +} catch (Exception e) { +LOG.error("Exception while fetching checkpoint statistics", e); Review Comment: We should handle cases where the checkpoint statistics are no longer stored on the web server, and we get the following error: ``` Could not find checkpointing statistics for checkpoint 243. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35285) Autoscaler key group optimization can interfere with scale-down.max-factor
[ https://issues.apache.org/jira/browse/FLINK-35285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868168#comment-17868168 ] Gyula Fora commented on FLINK-35285: {noformat} I would argue that a current parallelism 40 is not very close to the max parallelism of 120, though. Maybe our patterns are outside the norm? But to me this seems well within a "normal" range.{noformat} My production recommendation is to start with 720 as the max parallelism for anything but the smallest jobs. Your parallelism setting is definitely in the normal range but I think your max parallelism is not ideal given how key distribution works for Flink especially if you want to use the autoscaler. I don't really understand your proposal, let's say the ideal parallelism computed using the scale factor would be X, the autoscaler can decide to scale to anything larger than X as that would satisfy the throughput requirement but we can never scale to lower than X. This bound is true for both scale ups and scale downs. We always have to find the closes to X that is still larger or equal to it. > Autoscaler key group optimization can interfere with scale-down.max-factor > -- > > Key: FLINK-35285 > URL: https://issues.apache.org/jira/browse/FLINK-35285 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Trystan >Priority: Minor > > When setting a less aggressive scale down limit, the key group optimization > can prevent a vertex from scaling down at all. It will hunt from target > upwards to maxParallelism/2, and will always find currentParallelism again. > > A simple test trying to scale down from a parallelism of 60 with a > scale-down.max-factor of 0.2: > {code:java} > assertEquals(48, JobVertexScaler.scale(60, inputShipStrategies, 360, .8, 8, > 360)); {code} > > It seems reasonable to make a good attempt to spread data across subtasks, > but not at the expense of total deadlock. The problem is that during scale > down it doesn't actually ensure that newParallelism will be < > currentParallelism. The only workaround is to set a scale down factor large > enough such that it finds the next lowest divisor of the maxParallelism. > > Clunky, but something to ensure it can make at least some progress. There is > another test that now fails, but just to illustrate the point: > {code:java} > for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) > { > if ((scaleFactor < 1 && p < currentParallelism) || (scaleFactor > 1 && p > > currentParallelism)) { > if (maxParallelism % p == 0) { > return p; > } > } > } {code} > > Perhaps this is by design and not a bug, but total failure to scale down in > order to keep optimized key groups does not seem ideal. > > Key group optimization block: > [https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296C1-L303C10] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35285) Autoscaler key group optimization can interfere with scale-down.max-factor
[ https://issues.apache.org/jira/browse/FLINK-35285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868169#comment-17868169 ] Gyula Fora commented on FLINK-35285: It's much better to not scale down than to undershoot the parallelism because that will immediately lead to problems. > Autoscaler key group optimization can interfere with scale-down.max-factor > -- > > Key: FLINK-35285 > URL: https://issues.apache.org/jira/browse/FLINK-35285 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Trystan >Priority: Minor > > When setting a less aggressive scale down limit, the key group optimization > can prevent a vertex from scaling down at all. It will hunt from target > upwards to maxParallelism/2, and will always find currentParallelism again. > > A simple test trying to scale down from a parallelism of 60 with a > scale-down.max-factor of 0.2: > {code:java} > assertEquals(48, JobVertexScaler.scale(60, inputShipStrategies, 360, .8, 8, > 360)); {code} > > It seems reasonable to make a good attempt to spread data across subtasks, > but not at the expense of total deadlock. The problem is that during scale > down it doesn't actually ensure that newParallelism will be < > currentParallelism. The only workaround is to set a scale down factor large > enough such that it finds the next lowest divisor of the maxParallelism. > > Clunky, but something to ensure it can make at least some progress. There is > another test that now fails, but just to illustrate the point: > {code:java} > for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) > { > if ((scaleFactor < 1 && p < currentParallelism) || (scaleFactor > 1 && p > > currentParallelism)) { > if (maxParallelism % p == 0) { > return p; > } > } > } {code} > > Perhaps this is by design and not a bug, but total failure to scale down in > order to keep optimized key groups does not seem ideal. > > Key group optimization block: > [https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296C1-L303C10] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [DRAFT][FLINK-34440][formats][protobuf-confluent] Protobuf confluent dynamic format [flink]
dmariassy opened a new pull request, #25114: URL: https://github.com/apache/flink/pull/25114 **This is a DRAFT PR**. ## TODO - Add missing boilerplate (e.g. config) - Add debezium support - Test the format in Shopify Flink jobs - Docs (might be a separate PR) ## What is the purpose of the change - Add support for deserializing protobuf messages using the Confluent wire format and whose schemas can be fetched from Confluent Schema Registry - Add support for serializing Flink records using the Confluent protobuf wire format ## Brief change log My intention was to: - Maintain parity with the existing flink-protobuf format's semantics in terms of the Flink -> Protobuf / Protobuf -> Flink conversions - Maximize code reuse between flink-protobuf-confluent and flink-protobuf formats ### Deserializer - Fetch the message's protobuf descriptor from the Confluent schema registry - Generate a java class from the descriptor at runtime - Deserialize `byte[]`s to the generated `protobuf.Message` type using a `io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer` - Delegate the work of converting between a `protobuf.Message` and a `RowData` object to the existing flink-protobuf format ### Serializer - Convert the user's `RowType` to a protobuf descriptor - Generate a java class from the descriptor at runtime - Delegate the `RowData` -> `AbstractMessage` conversion to the existing flink-protobuf format - Serialize the `AbstractMessage` object using a `io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer` ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). This change added tests and can be verified as follows: - Added comprehensive test coverage - Will shortly deploy to Shopify Flink clusters ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes (com.github.os72:protoc-jar) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: yes - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs to follow -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org