[PR] [FLINK-35776] Simplify job status handling [flink-kubernetes-operator]
gyfora opened a new pull request, #851: URL: https://github.com/apache/flink-kubernetes-operator/pull/851 ## What is the purpose of the change There is a fairly complicated listing / observe logic for jobs currently that is no longer necessary as we have a stable logic to always record the jobID in the status before submission. ## Brief change log - Replace complex resource specific job listing/matching logic with JobId based querying - Clean up unnecessary codepaths - Tests ## Verifying this change The change is already covered by existing tests to a large extent. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changes to the `CustomResourceDescriptors`: no - Core observer or reconciler logic that is regularly executed: yes ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35776) Simplify job observe logic
[ https://issues.apache.org/jira/browse/FLINK-35776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35776: --- Labels: pull-request-available (was: ) > Simplify job observe logic > -- > > Key: FLINK-35776 > URL: https://issues.apache.org/jira/browse/FLINK-35776 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.10.0 > > > There is a fairly complicated listing / observe logic for jobs currently that > is no longer necessary as we have a stable logic to always record the jobID > in the status before submission. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35777) Observe session jobs during cleanup
Gyula Fora created FLINK-35777: -- Summary: Observe session jobs during cleanup Key: FLINK-35777 URL: https://issues.apache.org/jira/browse/FLINK-35777 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Gyula Fora Fix For: kubernetes-operator-1.10.0 In the same way we do for FlinkDeployments, session jobs should be also observed in the cleanup logic to let the reconciler work correctly -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35624) Release Testing: Verify FLIP-306 Unified File Merging Mechanism for Checkpoints
[ https://issues.apache.org/jira/browse/FLINK-35624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863545#comment-17863545 ] Zakelly Lan commented on FLINK-35624: - Thanks [~fanrui] ! I think it is related with FLINK-35778. The directory path the manager tried to create contains a character ':', which is reserved in URI format. I'm trying to fix that. > Release Testing: Verify FLIP-306 Unified File Merging Mechanism for > Checkpoints > --- > > Key: FLINK-35624 > URL: https://issues.apache.org/jira/browse/FLINK-35624 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing >Reporter: Zakelly Lan >Assignee: Rui Fan >Priority: Blocker > Labels: release-testing > Fix For: 1.20.0 > > Attachments: image-2024-07-07-14-04-47-065.png > > > Follow up the test for https://issues.apache.org/jira/browse/FLINK-32070 > > 1.20 is the MVP version for FLIP-306. It is a little bit complex and should > be tested carefully. The main idea of FLIP-306 is to merge checkpoint files > in TM side, and provide new {{{}StateHandle{}}}s to the JM. There will be a > TM-managed directory under the 'shared' checkpoint directory for each > subtask, and a TM-managed directory under the 'taskowned' checkpoint > directory for each Task Manager. Under those new introduced directories, the > checkpoint files will be merged into smaller file set. The following > scenarios need to be tested, including but not limited to: > # With the file merging enabled, periodic checkpoints perform properly, and > the failover, restore and rescale would also work well. > # Switch the file merging on and off across jobs, checkpoints and recovery > also work properly. > # There will be no left-over TM-managed directory, especially when there is > no cp complete before the job cancellation. > # File merging takes no effect in (native) savepoints. > Besides the behaviors above, it is better to validate the function of space > amplification control and metrics. All the config options can be found under > 'execution.checkpointing.file-merging'. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35778) Escape URI reserved characters when creating file-merging directories
Zakelly Lan created FLINK-35778: --- Summary: Escape URI reserved characters when creating file-merging directories Key: FLINK-35778 URL: https://issues.apache.org/jira/browse/FLINK-35778 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.20.0 Reporter: Zakelly Lan Assignee: Zakelly Lan Fix For: 1.20.0 Currently, the file-merging manager for checkpoint files will create directories based on tm resource id, job id and operator ids. While in some cases, these ids include some characters that are reserved in URI scheme. So we should do a simple escape for those ids. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35778) Escape URI reserved characters when creating file-merging directories
[ https://issues.apache.org/jira/browse/FLINK-35778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zakelly Lan updated FLINK-35778: Priority: Blocker (was: Major) > Escape URI reserved characters when creating file-merging directories > - > > Key: FLINK-35778 > URL: https://issues.apache.org/jira/browse/FLINK-35778 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.20.0 >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Blocker > Fix For: 1.20.0 > > > Currently, the file-merging manager for checkpoint files will create > directories based on tm resource id, job id and operator ids. While in some > cases, these ids include some characters that are reserved in URI scheme. So > we should do a simple escape for those ids. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35778] Escape URI reserved characters when creating file-merging directories [flink]
Zakelly opened a new pull request, #25030: URL: https://github.com/apache/flink/pull/25030 ## What is the purpose of the change Currently, the file-merging manager for checkpoint files will create directories based on tm resource id, job id and operator ids. While in some cases, these ids include some characters that are reserved in URI scheme. This PR does a simple escape for those ids. ## Brief change log - Add a tool method `uriEscape` in file-merging manager. ## Verifying this change - New UT `FileMergingSnapshotManagerTestBase#testSpecialCharactersInPath`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35778) Escape URI reserved characters when creating file-merging directories
[ https://issues.apache.org/jira/browse/FLINK-35778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35778: --- Labels: pull-request-available (was: ) > Escape URI reserved characters when creating file-merging directories > - > > Key: FLINK-35778 > URL: https://issues.apache.org/jira/browse/FLINK-35778 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.20.0 >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Blocker > Labels: pull-request-available > Fix For: 1.20.0 > > > Currently, the file-merging manager for checkpoint files will create > directories based on tm resource id, job id and operator ids. While in some > cases, these ids include some characters that are reserved in URI scheme. So > we should do a simple escape for those ids. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35778] Escape URI reserved characters when creating file-merging directories [flink]
flinkbot commented on PR #25030: URL: https://github.com/apache/flink/pull/25030#issuecomment-2212368322 ## CI report: * 7e0567cd69b36759b378a88f9cf3693060f30f5e UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [BP-1.20][FLINK-35778] Escape URI reserved characters when creating file-merging directories [flink]
Zakelly opened a new pull request, #25031: URL: https://github.com/apache/flink/pull/25031 ## What is the purpose of the change Currently, the file-merging manager for checkpoint files will create directories based on tm resource id, job id and operator ids. While in some cases, these ids include some characters that are reserved in URI scheme. This PR does a simple escape for those ids. ## Brief change log - Add a tool method `uriEscape` in file-merging manager. ## Verifying this change - New UT `FileMergingSnapshotManagerTestBase#testSpecialCharactersInPath`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.20][FLINK-35778] Escape URI reserved characters when creating file-merging directories [flink]
flinkbot commented on PR #25031: URL: https://github.com/apache/flink/pull/25031#issuecomment-2212370332 ## CI report: * b47550e43a1d71c35c476ce69fbbb26bec549e68 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35126] Rework default checkpoint progress check window [flink-kubernetes-operator]
gyfora commented on code in PR #850: URL: https://github.com/apache/flink-kubernetes-operator/pull/850#discussion_r1667639016 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java: ## @@ -178,30 +178,46 @@ private boolean evaluateCheckpoints( return true; } -var completedCheckpointsCheckWindow = - configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW); +var windowOpt = + configuration.getOptional(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW); CheckpointConfig checkpointConfig = new CheckpointConfig(); checkpointConfig.configure(configuration); var checkpointingInterval = checkpointConfig.getCheckpointInterval(); var checkpointingTimeout = checkpointConfig.getCheckpointTimeout(); -var tolerationFailureNumber = checkpointConfig.getTolerableCheckpointFailureNumber() + 1; -var minCompletedCheckpointsCheckWindow = -Math.max( -checkpointingInterval * tolerationFailureNumber, -checkpointingTimeout * tolerationFailureNumber); -if (completedCheckpointsCheckWindow.toMillis() < minCompletedCheckpointsCheckWindow) { -LOG.warn( -"{} is not long enough. Default to max({} * {}, {} * {}): {}ms", - OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW.key(), -CHECKPOINTING_INTERVAL.key(), -TOLERABLE_FAILURE_NUMBER.key(), -CHECKPOINTING_TIMEOUT.key(), -TOLERABLE_FAILURE_NUMBER.key(), -minCompletedCheckpointsCheckWindow); -completedCheckpointsCheckWindow = Duration.ofMillis(minCompletedCheckpointsCheckWindow); +var tolerationFailureNumber = checkpointConfig.getTolerableCheckpointFailureNumber() + 2; Review Comment: Good point adding this to the docs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
gyfora commented on PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#issuecomment-2212378196 A small addition to the previous comments. The SnapshotObserver currently also observes and updates the lastSavepointInfo for terminal jobs. This is a key mechanism to be able to handle failures during stateful upgrades. So we have to update that logic as well so that instead of record the lastSavepoint status we use a new shared logic with the snapshot custom resource mechanism. This mechanism is covered by some tests currently, enabling the snapshot CR for those tests would probably help repro the problem. For example: 1. Introduce a failure after executing a stop-with-savepoint operation (before the snapshot CR was created) 2. This is the point where the observer would actually observe the savepoint/checkpoint info from the terminal job and update the status 3. Assert that the upgrade is actually executed from the correct savepoint I believe with the current implementation the savpoint info would be lost -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
gyfora commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1667645251 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java: ## @@ -266,19 +304,31 @@ protected void restoreJob( Optional savepointOpt = Optional.empty(); if (spec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) { -savepointOpt = -Optional.ofNullable( -ctx.getResource() -.getStatus() -.getJobStatus() -.getSavepointInfo() -.getLastSavepoint()) -.flatMap(s -> Optional.ofNullable(s.getLocation())); +if (FlinkStateSnapshotUtils.shouldCreateSnapshotResource( +ctx.getOperatorConfig(), deployConfig)) { +savepointOpt = getLatestSavepointPathFromFlinkStateSnapshots(ctx); +} else { +savepointOpt = +Optional.ofNullable( +ctx.getResource() +.getStatus() +.getJobStatus() +.getSavepointInfo() +.getLastSavepoint()) +.flatMap(s -> Optional.ofNullable(s.getLocation())); Review Comment: Actually I think the cleanest / simplest thing to do would be to add a new status field like: ``` snapshotReference / upgradeSnapshotReference ``` This should be updated during SAVEPOINT deployments (when we start from savepoint/checkpoint), after savepoint cancellation or when observing the last state of a terminal job. (basically every time we currently update the lastSavepoint for upgrade purposes ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-33192] Fix Memory Leak in WindowOperator due to Improper Timer Cleanup [flink]
kartikeypant opened a new pull request, #25032: URL: https://github.com/apache/flink/pull/25032 ## What is the purpose of the change * This Pull Request addresses an important state memory leak issue identified within the default window operator of Apache Flink. * After this change, the cleanup timer should be registered for every window that's added to the window state regardless of it emitting a result after it’s fired. * This change is associated to the following bug: [FLINK-33192 ](https://issues.apache.org/jira/projects/FLINK/issues/FLINK-33192) ## Brief change log Modified `org.apache.flink.streaming.runtime.operators.windowing.WindowOperator` to register clean up timer for window states regardless of whether emits a result or not. ## Verifying this change - WindowOperatorTest is working for functional correctness. - Written a new test `testCleanupTimerWithEmptyStateNoResultForTumblingWindows`. This test simulates a empty returning aggregate function with tumbling windows. The passing of the test validates that the window contents are being cleared during the time the cleanup time is actually supposed to be called. Hence, the memory leak should not be happening after the change. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): yes - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33192] Fix Memory Leak in WindowOperator due to Improper Timer Cleanup [flink]
flinkbot commented on PR #25032: URL: https://github.com/apache/flink/pull/25032#issuecomment-2212419439 ## CI report: * 7a901d8684e59a19dddb7db926adc89c3ca30ead UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1667680584 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/snapshot/StateSnapshotReconciler.java: ## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.reconciler.snapshot; + +import org.apache.flink.autoscaler.utils.DateTimeUtils; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.kubernetes.operator.api.CrdConstants; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; +import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec; +import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotState; +import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType; +import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; +import org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotContext; +import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory; +import org.apache.flink.util.Preconditions; + +import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; +import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.ObjectUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.Optional; + +/** The reconciler for the {@link org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot}. */ +@RequiredArgsConstructor +public class StateSnapshotReconciler { + +private static final Logger LOG = LoggerFactory.getLogger(StateSnapshotReconciler.class); + +private final FlinkResourceContextFactory ctxFactory; + +public void reconcile(FlinkStateSnapshotContext ctx) throws Exception { +var source = ctx.getResource().getSpec().getJobReference(); +var resource = ctx.getResource(); + +var savepointState = resource.getStatus().getState(); +if (!FlinkStateSnapshotState.TRIGGER_PENDING.equals(savepointState)) { +return; +} + +if (resource.getSpec().isSavepoint() +&& resource.getSpec().getSavepoint().getAlreadyExists()) { +LOG.info( +"Snapshot {} is marked as completed in spec, skipping triggering savepoint.", +resource.getMetadata().getName()); +resource.getStatus().setState(FlinkStateSnapshotState.COMPLETED); + resource.getStatus().setPath(resource.getSpec().getSavepoint().getPath()); +var time = DateTimeUtils.kubernetes(Instant.now()); +resource.getStatus().setTriggerTimestamp(time); +resource.getStatus().setResultTimestamp(time); +return; +} + +var secondaryResource = +ctx.getSecondaryResource() +.orElseThrow( +() -> +new RuntimeException( +String.format( +"Secondary resource %s not found", +source))); +if (!ReconciliationUtils.isJobRunning(secondaryResource.getStatus())) { +LOG.warn( +"Target job {} for savepoint {} is not running, cannot trigger snapshot.", +secondaryResource.getMetadata().getName(), +resource.getMetadata().getName()); +return; +} Review Comment: I have added a commit which will make sure that we abandon snapshots if referenced Flink resource is not found, or the job is not running, in both observe and reconcile phase. An event will also be generated for the snapshot resource in these cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific co
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1667680584 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/snapshot/StateSnapshotReconciler.java: ## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.reconciler.snapshot; + +import org.apache.flink.autoscaler.utils.DateTimeUtils; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.kubernetes.operator.api.CrdConstants; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; +import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec; +import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotState; +import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType; +import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; +import org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotContext; +import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory; +import org.apache.flink.util.Preconditions; + +import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; +import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.ObjectUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.Optional; + +/** The reconciler for the {@link org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot}. */ +@RequiredArgsConstructor +public class StateSnapshotReconciler { + +private static final Logger LOG = LoggerFactory.getLogger(StateSnapshotReconciler.class); + +private final FlinkResourceContextFactory ctxFactory; + +public void reconcile(FlinkStateSnapshotContext ctx) throws Exception { +var source = ctx.getResource().getSpec().getJobReference(); +var resource = ctx.getResource(); + +var savepointState = resource.getStatus().getState(); +if (!FlinkStateSnapshotState.TRIGGER_PENDING.equals(savepointState)) { +return; +} + +if (resource.getSpec().isSavepoint() +&& resource.getSpec().getSavepoint().getAlreadyExists()) { +LOG.info( +"Snapshot {} is marked as completed in spec, skipping triggering savepoint.", +resource.getMetadata().getName()); +resource.getStatus().setState(FlinkStateSnapshotState.COMPLETED); + resource.getStatus().setPath(resource.getSpec().getSavepoint().getPath()); +var time = DateTimeUtils.kubernetes(Instant.now()); +resource.getStatus().setTriggerTimestamp(time); +resource.getStatus().setResultTimestamp(time); +return; +} + +var secondaryResource = +ctx.getSecondaryResource() +.orElseThrow( +() -> +new RuntimeException( +String.format( +"Secondary resource %s not found", +source))); +if (!ReconciliationUtils.isJobRunning(secondaryResource.getStatus())) { +LOG.warn( +"Target job {} for savepoint {} is not running, cannot trigger snapshot.", +secondaryResource.getMetadata().getName(), +resource.getMetadata().getName()); +return; +} Review Comment: I have added a commit which will make sure that we abandon snapshots if referenced Flink resource is not found, or the job is not running, in both observe and reconcile phase. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For qu
Re: [PR] [BP-1.19] [FLINK-33192] Fix Memory Leak in WindowOperator due to Improper Timer Cleanup [flink]
flinkbot commented on PR #25033: URL: https://github.com/apache/flink/pull/25033#issuecomment-2212437163 ## CI report: * d65b869026b08f322bb719516d3dac76da8f1b32 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [BP-1.20] [FLINK-33192] Fix Memory Leak in WindowOperator due to Improper Timer Cleanup [flink]
kartikeypant opened a new pull request, #25034: URL: https://github.com/apache/flink/pull/25034 1.20 backport for parent PR https://github.com/apache/flink/pull/24917 --- ## What is the purpose of the change * This Pull Request addresses an important state memory leak issue identified within the default window operator of Apache Flink. * After this change, the cleanup timer should be registered for every window that's added to the window state regardless of it emitting a result after it’s fired. * This change is associated to the following bug: [FLINK-33192 ](https://issues.apache.org/jira/projects/FLINK/issues/FLINK-33192) ## Brief change log Modified `org.apache.flink.streaming.runtime.operators.windowing.WindowOperator` to register clean up timer for window states regardless of whether emits a result or not. ## Verifying this change - WindowOperatorTest is working for functional correctness. - Written a new test `testCleanupTimerWithEmptyStateNoResultForTumblingWindows`. This test simulates a empty returning aggregate function with tumbling windows. The passing of the test validates that the window contents are being cleared during the time the cleanup time is actually supposed to be called. Hence, the memory leak should not be happening after the change. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): yes - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.20] [FLINK-33192] Fix Memory Leak in WindowOperator due to Improper Timer Cleanup [flink]
flinkbot commented on PR #25034: URL: https://github.com/apache/flink/pull/25034#issuecomment-2212439134 ## CI report: * e1f98fb556f5e6a4392c65a2bb67ef577a28232a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
FangYongs commented on PR #25012: URL: https://github.com/apache/flink/pull/25012#issuecomment-2212441328 Thanks @HuangZhenQiu , +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
FangYongs merged PR #25012: URL: https://github.com/apache/flink/pull/25012 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [docs][minor] Fix typo in log message: "oder" to "order" [flink-cdc]
Karl-WangSK commented on PR #3451: URL: https://github.com/apache/flink-cdc/pull/3451#issuecomment-2212443087 CI shows that NewlyAddedTableITCase.testNewlyAddedTableForExistsPipelineThrice failed, but I tested locally,it shows success. can trigger the CI again? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1667707042 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java: ## @@ -266,19 +304,31 @@ protected void restoreJob( Optional savepointOpt = Optional.empty(); if (spec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) { -savepointOpt = -Optional.ofNullable( -ctx.getResource() -.getStatus() -.getJobStatus() -.getSavepointInfo() -.getLastSavepoint()) -.flatMap(s -> Optional.ofNullable(s.getLocation())); +if (FlinkStateSnapshotUtils.shouldCreateSnapshotResource( +ctx.getOperatorConfig(), deployConfig)) { +savepointOpt = getLatestSavepointPathFromFlinkStateSnapshots(ctx); +} else { +savepointOpt = +Optional.ofNullable( +ctx.getResource() +.getStatus() +.getJobStatus() +.getSavepointInfo() +.getLastSavepoint()) +.flatMap(s -> Optional.ofNullable(s.getLocation())); Review Comment: Thank you for detailing this problem so well, it makes perfect sense, I have overlooked this part of the operator. I think I could tackle this problem with a new status field as you have proposed called `upgradeSnapshotReference` in the job status, but this still leaves an important feature out which was covered before: Manual savepoints triggered by creating a new `FlinkStateSnapshot` resource, and periodic savepoints that create new `FlinkStateSnapshot` resources will not update this new field `upgradeSnapshotReference` in FlinkDeployment/FlinkSessionJob statuses. I see 3 solutions: - Update FlinkDeployment/FlinkSessionJob status from the snapshot controller, I agree that this is not an optimal pattern, and I would avoid it. - List all `FlinkStateSnapshot` resources when re-deploying a Flink job and we check `upgradeSnapshotReference`. We could compare the time the snapshots were taken and use the most recent. This might lead to confusion because e.g. starting a suspended job might not use the savepoint we can see in the status. - From the FlinkDeployment/FlinkSessionJob controllers check the status of `FlinkStateSnapshot` resources with the help of `InformerEventSource`, not sure how viable this is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35779) Update documentation for PubSubSinkV2
Ahmed Hamdy created FLINK-35779: --- Summary: Update documentation for PubSubSinkV2 Key: FLINK-35779 URL: https://issues.apache.org/jira/browse/FLINK-35779 Project: Flink Issue Type: Sub-task Components: Connectors / Google Cloud PubSub Reporter: Ahmed Hamdy Fix For: gcp-pubsub-3.2.0 Update PubSub documentation with {{PubSubSinkV2}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35779][Connectors/Google PubSub] Update documentation for PubSubSinkV2 [flink-connector-gcp-pubsub]
vahmed-hamdy opened a new pull request, #29: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/29 ## Description Update PubSub documentation to include `PubSubSinkV2` changes ## Testing - This is a documentation change and need no additional testing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35779][Connectors/Google PubSub] Update documentation for PubSubSinkV2 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on PR #29: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/29#issuecomment-2212482167 @snuyanzin could you please take a look at this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35779) Update documentation for PubSubSinkV2
[ https://issues.apache.org/jira/browse/FLINK-35779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35779: --- Labels: pull-request-available (was: ) > Update documentation for PubSubSinkV2 > - > > Key: FLINK-35779 > URL: https://issues.apache.org/jira/browse/FLINK-35779 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Google Cloud PubSub >Reporter: Ahmed Hamdy >Priority: Major > Labels: pull-request-available > Fix For: gcp-pubsub-3.2.0 > > > Update PubSub documentation with {{PubSubSinkV2}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35780) Support state migration from disabling to enabling ttl in RocksDBState
xiangyu feng created FLINK-35780: Summary: Support state migration from disabling to enabling ttl in RocksDBState Key: FLINK-35780 URL: https://issues.apache.org/jira/browse/FLINK-35780 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends Reporter: xiangyu feng -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [WIP][FLINK-35780] Support state migration from disabling to enabling ttl for RocksDBState [flink]
xiangyuf opened a new pull request, #25035: URL: https://github.com/apache/flink/pull/25035 [FLINK-35780][state] Support state migration from disabling to enabling ttl in RocksDBState ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35780) Support state migration from disabling to enabling ttl in RocksDBState
[ https://issues.apache.org/jira/browse/FLINK-35780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35780: --- Labels: pull-request-available (was: ) > Support state migration from disabling to enabling ttl in RocksDBState > -- > > Key: FLINK-35780 > URL: https://issues.apache.org/jira/browse/FLINK-35780 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: xiangyu feng >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [WIP][FLINK-35780] Support state migration from disabling to enabling ttl for RocksDBState [flink]
flinkbot commented on PR #25035: URL: https://github.com/apache/flink/pull/25035#issuecomment-2212509210 ## CI report: * 55b44e1f22d3443912f6a9c150b8b22cdd9ed525 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.18] [FLINK-33192] Fix Memory Leak in WindowOperator due to Improper Timer Cleanup [flink]
kartikeypant commented on PR #25032: URL: https://github.com/apache/flink/pull/25032#issuecomment-2212518819 @flinkbot - run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
gyfora commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1667757174 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java: ## @@ -266,19 +304,31 @@ protected void restoreJob( Optional savepointOpt = Optional.empty(); if (spec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) { -savepointOpt = -Optional.ofNullable( -ctx.getResource() -.getStatus() -.getJobStatus() -.getSavepointInfo() -.getLastSavepoint()) -.flatMap(s -> Optional.ofNullable(s.getLocation())); +if (FlinkStateSnapshotUtils.shouldCreateSnapshotResource( +ctx.getOperatorConfig(), deployConfig)) { +savepointOpt = getLatestSavepointPathFromFlinkStateSnapshots(ctx); +} else { +savepointOpt = +Optional.ofNullable( +ctx.getResource() +.getStatus() +.getJobStatus() +.getSavepointInfo() +.getLastSavepoint()) +.flatMap(s -> Optional.ofNullable(s.getLocation())); Review Comment: @mateczagany I think the only good solution is to introduce a separate `upgradeSnapshotReference` that is only updated during the upgrade cycle (and during terminal job observe). Any snapshot observed this way is logically speaking always the latest. A manual / periodic snapshot should never override this, doing that would be surely an error in the logic so this is absolutely not a problem. This way we will actually separate the upgrade snapshot handling from the savepoint taking / management which was kind of confused together until now. In other words we don't need any of the "3 solutions" :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35016] catalog changes for model resource [flink]
lihaosky opened a new pull request, #25036: URL: https://github.com/apache/flink/pull/25036 ## What is the purpose of the change Catalog changes to support model resource ## Brief change log * Add `CatalogModel` related resource * Add model CRUD operations in `CatalogManager` and `Catalog` ## Verifying this change Unit test added ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35016) Catalog changes for model CRUD
[ https://issues.apache.org/jira/browse/FLINK-35016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35016: --- Labels: pull-request-available (was: ) > Catalog changes for model CRUD > -- > > Key: FLINK-35016 > URL: https://issues.apache.org/jira/browse/FLINK-35016 > Project: Flink > Issue Type: Sub-task >Reporter: Hao Li >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35016] catalog changes for model resource [flink]
lihaosky commented on PR #25036: URL: https://github.com/apache/flink/pull/25036#issuecomment-2212606448 cc @twalthr -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35016] catalog changes for model resource [flink]
flinkbot commented on PR #25036: URL: https://github.com/apache/flink/pull/25036#issuecomment-2212607683 ## CI report: * dd6632371c5404780ab79e7ed308db46c6152154 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35750][runtime/metrics] Fix that the latency marker metrics ar… [flink]
RocMarshal opened a new pull request, #25037: URL: https://github.com/apache/flink/pull/25037 …en't updated after failover. (cherry picked from commit 615d19735b0691b57262a110e6078c3488349f5a) ## What is the purpose of the change BP for 1.17 of https://github.com/apache/flink/pull/25021 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35750][runtime/metrics] Fix that the latency marker metrics aren't updated after failover. [flink]
RocMarshal opened a new pull request, #25038: URL: https://github.com/apache/flink/pull/25038 (cherry picked from commit 615d19735b0691b57262a110e6078c3488349f5a) ## What is the purpose of the change BP for 1.18 of https://github.com/apache/flink/pull/25021 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35750][runtime/metrics] Fix that the latency marker metrics aren't updated after failover. [flink]
RocMarshal opened a new pull request, #25039: URL: https://github.com/apache/flink/pull/25039 (cherry picked from commit 615d19735b0691b57262a110e6078c3488349f5a) ## What is the purpose of the change BP for 1.19 of https://github.com/apache/flink/pull/25021 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35750][runtime/metrics] Fix that the latency marker metrics aren't updated after failover. [flink]
RocMarshal opened a new pull request, #25040: URL: https://github.com/apache/flink/pull/25040 (cherry picked from commit 615d19735b0691b57262a110e6078c3488349f5a) ## What is the purpose of the change BP for 1.20 of https://github.com/apache/flink/pull/25021 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35750][runtime/metrics] Fix that the latency marker metrics aren't updated after failover. [flink]
flinkbot commented on PR #25037: URL: https://github.com/apache/flink/pull/25037#issuecomment-221297 ## CI report: * 1300c054a1ffa88c18f4c6c28a154207ce12bf98 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35750][runtime/metrics] Fix that the latency marker metrics aren't updated after failover. [flink]
flinkbot commented on PR #25038: URL: https://github.com/apache/flink/pull/25038#issuecomment-2212777977 ## CI report: * 8cb40a599c1e127b5f427be710f4a840c7fa4ca1 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35750][runtime/metrics] Fix that the latency marker metrics aren't updated after failover. [flink]
flinkbot commented on PR #25039: URL: https://github.com/apache/flink/pull/25039#issuecomment-2212781383 ## CI report: * f1c40d72af10e7ba07304c018cdd2588a07bf395 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35750][runtime/metrics] Fix that the latency marker metrics aren't updated after failover. [flink]
flinkbot commented on PR #25040: URL: https://github.com/apache/flink/pull/25040#issuecomment-2212781614 ## CI report: * 7f1b9491efa0ce3bb7a4815732d9ff10a82056bb UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35778] Escape URI reserved characters when creating file-merging directories [flink]
1996fanrui commented on code in PR #25030: URL: https://github.com/apache/flink/pull/25030#discussion_r1667861271 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java: ## @@ -773,6 +773,13 @@ static boolean shouldSyncAfterClosingLogicalFile(FileSystem fileSystem) { return true; } +private static String uriEscape(String input) { +// All reserved characters (RFC 2396) will be removed. This is enough for flink's resource +// id, job id and operator id. +// Ref: https://docs.oracle.com/javase/8/docs/api/index.html?java/net/URI.html +return input.replaceAll("[;/?:@&=+$,\\[\\]]", "-"); +} Review Comment: It seems work after this PR. I still have 2 questions: 1. Do flink or java have the utility class to remove reserved characters before? - I guess not only `FileMerging` feature needs to consider it. So I'm not sure should the `uriEscape` be put here. 2. For test, did you try run a real job locally with this PR? I'm not sure is there other issues if we run `FileMerging` feature on a standalone cluster. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35781) Make pipeline parallelism config optional
yux created FLINK-35781: --- Summary: Make pipeline parallelism config optional Key: FLINK-35781 URL: https://issues.apache.org/jira/browse/FLINK-35781 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: yux Currently, Flink CDC `PIPELINE_PARALLELISM` option is forcefully required in pipeline definition, which turns out to be unnecessary since Flink already has a fallback parallelism option. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [BP-1.18] [FLINK-33192] Fix Memory Leak in WindowOperator due to Improper Timer Cleanup [flink]
fredia merged PR #25032: URL: https://github.com/apache/flink/pull/25032 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.18] [FLINK-33192] Fix Memory Leak in WindowOperator due to Improper Timer Cleanup [flink]
fredia commented on PR #25032: URL: https://github.com/apache/flink/pull/25032#issuecomment-2212848554 @kartikeypant Thanks for the PR, LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35781][cli] Provide a default parallelism (1) for pipeline jobs [flink-cdc]
yuxiqian commented on PR #3458: URL: https://github.com/apache/flink-cdc/pull/3458#issuecomment-2212862904 @ruanhang1993 PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35781) Make pipeline parallelism config optional
[ https://issues.apache.org/jira/browse/FLINK-35781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35781: --- Labels: pull-request-available (was: ) > Make pipeline parallelism config optional > - > > Key: FLINK-35781 > URL: https://issues.apache.org/jira/browse/FLINK-35781 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: yux >Priority: Major > Labels: pull-request-available > > Currently, Flink CDC `PIPELINE_PARALLELISM` option is forcefully required in > pipeline definition, which turns out to be unnecessary since Flink already > has a fallback parallelism option. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35345) FLIP-448: Introduce Pluggable Workflow Scheduler Interface for Materialized Table
[ https://issues.apache.org/jira/browse/FLINK-35345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863643#comment-17863643 ] Rui Fan commented on FLINK-35345: - Hi [~lsy] [~hackergin] , I saw FLINK-35187 has the release note for now, I'm not sure whether this Jira needs release note as well. > FLIP-448: Introduce Pluggable Workflow Scheduler Interface for Materialized > Table > - > > Key: FLINK-35345 > URL: https://issues.apache.org/jira/browse/FLINK-35345 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API, Table SQL / Ecosystem >Affects Versions: 1.20.0 >Reporter: dalongliu >Assignee: dalongliu >Priority: Major > Fix For: 1.20.0 > > > This is an umbrella issue for FLIP-448: Introduce Pluggable Workflow > Scheduler Interface for Materialized Table, for more detail, please see > https://cwiki.apache.org/confluence/display/FLINK/FLIP-448%3A+Introduce+Pluggable+Workflow+Scheduler+Interface+for+Materialized+Table. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [hotfix][doc] missing "STATISTICS" sql reserved keyword in zh doc [flink]
showuon opened a new pull request, #25041: URL: https://github.com/apache/flink/pull/25041 ## What is the purpose of the change In [FLINK-28493](https://github.com/apache/flink/pull/20506/), we implemented "ANALYZE TABLE" syntax feature and documented the reserved keyword: **STATISTICS** in the [eng version doc](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/overview/#reserved-keywords). But we missed that in [zh version doc](https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/overview/#%e4%bf%9d%e7%95%99%e5%85%b3%e9%94%ae%e5%ad%97). ## Brief change log Added the missing reserved keyword: **STATISTICS** keyword in zh doc. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35345) FLIP-448: Introduce Pluggable Workflow Scheduler Interface for Materialized Table
[ https://issues.apache.org/jira/browse/FLINK-35345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863644#comment-17863644 ] Feng Jin commented on FLINK-35345: -- [~fanrui] This is mainly for use by FLINK-35187 and does not require a release note. > FLIP-448: Introduce Pluggable Workflow Scheduler Interface for Materialized > Table > - > > Key: FLINK-35345 > URL: https://issues.apache.org/jira/browse/FLINK-35345 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API, Table SQL / Ecosystem >Affects Versions: 1.20.0 >Reporter: dalongliu >Assignee: dalongliu >Priority: Major > Fix For: 1.20.0 > > > This is an umbrella issue for FLIP-448: Introduce Pluggable Workflow > Scheduler Interface for Materialized Table, for more detail, please see > https://cwiki.apache.org/confluence/display/FLINK/FLIP-448%3A+Introduce+Pluggable+Workflow+Scheduler+Interface+for+Materialized+Table. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix][doc] missing "STATISTICS" sql reserved keyword in zh doc [flink]
flinkbot commented on PR #25041: URL: https://github.com/apache/flink/pull/25041#issuecomment-2212873141 ## CI report: * aa4b31c68b9831a2e78bfff7d976e2b32dd229bb UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35345) FLIP-448: Introduce Pluggable Workflow Scheduler Interface for Materialized Table
[ https://issues.apache.org/jira/browse/FLINK-35345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863645#comment-17863645 ] Rui Fan commented on FLINK-35345: - thanks [~hackergin] for the quick feedback! :) > FLIP-448: Introduce Pluggable Workflow Scheduler Interface for Materialized > Table > - > > Key: FLINK-35345 > URL: https://issues.apache.org/jira/browse/FLINK-35345 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API, Table SQL / Ecosystem >Affects Versions: 1.20.0 >Reporter: dalongliu >Assignee: dalongliu >Priority: Major > Fix For: 1.20.0 > > > This is an umbrella issue for FLIP-448: Introduce Pluggable Workflow > Scheduler Interface for Materialized Table, for more detail, please see > https://cwiki.apache.org/confluence/display/FLINK/FLIP-448%3A+Introduce+Pluggable+Workflow+Scheduler+Interface+for+Materialized+Table. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35737] Prevent Memory Leak by Closing MemoryExecutionGraphInfoStore on MiniCluster Shutdown [flink]
fengjiajie commented on PR #25009: URL: https://github.com/apache/flink/pull/25009#issuecomment-2212880693 Could you please take a look when you have time? @Samrat002 @1996fanrui -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35533) FLIP-459: Support Flink hybrid shuffle integration with Apache Celeborn
[ https://issues.apache.org/jira/browse/FLINK-35533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863648#comment-17863648 ] Rui Fan commented on FLINK-35533: - Hi [~tanyuxin] [~Weijie Guo] , may I know do we need release note for this JIRA? :) > FLIP-459: Support Flink hybrid shuffle integration with Apache Celeborn > --- > > Key: FLINK-35533 > URL: https://issues.apache.org/jira/browse/FLINK-35533 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Affects Versions: 1.20.0 >Reporter: Yuxin Tan >Assignee: Yuxin Tan >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > This is the jira for > [FLIP-459|https://cwiki.apache.org/confluence/display/FLINK/FLIP-459%3A+Support+Flink+hybrid+shuffle+integration+with+Apache+Celeborn]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35137) Release flink-connector-jdbc v3.2.0 for Flink 1.19
[ https://issues.apache.org/jira/browse/FLINK-35137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863653#comment-17863653 ] He Wang commented on FLINK-35137: - [~dannycranmer] Sorry I didn't make it clear, I was talking about the documentation site here. [https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/] > Release flink-connector-jdbc v3.2.0 for Flink 1.19 > -- > > Key: FLINK-35137 > URL: https://issues.apache.org/jira/browse/FLINK-35137 > Project: Flink > Issue Type: Sub-task > Components: Connectors / JDBC >Reporter: Danny Cranmer >Assignee: Danny Cranmer >Priority: Major > Fix For: jdbc-3.2.0 > > > https://github.com/apache/flink-connector-jdbc -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35778] Escape URI reserved characters when creating file-merging directories [flink]
Zakelly commented on code in PR #25030: URL: https://github.com/apache/flink/pull/25030#discussion_r1667911678 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java: ## @@ -773,6 +773,13 @@ static boolean shouldSyncAfterClosingLogicalFile(FileSystem fileSystem) { return true; } +private static String uriEscape(String input) { +// All reserved characters (RFC 2396) will be removed. This is enough for flink's resource +// id, job id and operator id. +// Ref: https://docs.oracle.com/javase/8/docs/api/index.html?java/net/URI.html +return input.replaceAll("[;/?:@&=+$,\\[\\]]", "-"); +} Review Comment: 1. I did some investigation. There are some utilities for URI from third-party dependencies, but the behavior is not same as what we want. Instead of removing or replacing the reserved characters, they may treat the reserved characters as valid ones. The `java.net.URLEncoding` seems feasible, however, URL specification is a little bit different from the URI. So I'd prefer doing a simple replace, since the ids from flink is neat. 2. Well no, we never did that. We tried Per-Job/Session on yarn and local mini-cluster (E2E). I'm trying the local standalone cluster. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35750][runtime/metrics] Fix that the latency marker metrics aren't updated after failover. [flink]
RocMarshal commented on PR #25038: URL: https://github.com/apache/flink/pull/25038#issuecomment-2212931466 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [hotfix] Fix the broken link of standalone deployment [flink]
Zakelly opened a new pull request, #25042: URL: https://github.com/apache/flink/pull/25042 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [BP-1.20][hotfix] Fix the broken link of standalone deployment [flink]
Zakelly opened a new pull request, #25043: URL: https://github.com/apache/flink/pull/25043 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] Fix the broken link of standalone deployment [flink]
flinkbot commented on PR #25042: URL: https://github.com/apache/flink/pull/25042#issuecomment-2212952936 ## CI report: * b334fec2d8479c8e9e30928fc3fd6e6fa3f8f296 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [BP-1.19][hotfix] Fix the broken link of standalone deployment [flink]
Zakelly opened a new pull request, #25044: URL: https://github.com/apache/flink/pull/25044 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.20][hotfix] Fix the broken link of standalone deployment [flink]
flinkbot commented on PR #25043: URL: https://github.com/apache/flink/pull/25043#issuecomment-2212960152 ## CI report: * 3639c3545b47b7a06562c3deb316a11dccf3c885 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [hotfix] Fix the broken link of standalone deployment [flink]
Zakelly opened a new pull request, #25045: URL: https://github.com/apache/flink/pull/25045 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.19][hotfix][doc] Fix the broken link of standalone deployment [flink]
flinkbot commented on PR #25044: URL: https://github.com/apache/flink/pull/25044#issuecomment-2212968492 ## CI report: * aa3002c4d9433402793d85bccbba3d0768c1761a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.18][hotfix][doc] Fix the broken link of standalone deployment [flink]
flinkbot commented on PR #25045: URL: https://github.com/apache/flink/pull/25045#issuecomment-2212968868 ## CI report: * 9dc582bf8e0ed6e19306431c9a71542bb238784b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.18] [FLINK-33192] Fix Memory Leak in WindowOperator due to Improper Timer Cleanup [flink]
kartikeypant commented on PR #25032: URL: https://github.com/apache/flink/pull/25032#issuecomment-2213000911 Thanks for the merge, @fredia. I have also created backport PRs for 1.19 (https://github.com/apache/flink/pull/25033) and 1.20 versions (https://github.com/apache/flink/pull/25034), can you please review them as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35753) ParquetColumnarRowInputFormatTest.testContinuousRepetition(int) failed due to ArrayIndexOutOfBoundsException
[ https://issues.apache.org/jira/browse/FLINK-35753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863669#comment-17863669 ] Jingsong Lee commented on FLINK-35753: -- CC [~stephenwoo] > ParquetColumnarRowInputFormatTest.testContinuousRepetition(int) failed due to > ArrayIndexOutOfBoundsException > > > Key: FLINK-35753 > URL: https://issues.apache.org/jira/browse/FLINK-35753 > Project: Flink > Issue Type: Bug > Components: Build System / CI >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Priority: Major > > {code:java} > Jul 02 01:23:50 01:23:50.105 [ERROR] > org.apache.flink.formats.parquet.ParquetColumnarRowInputFormatTest.testContinuousRepetition(int)[2] > -- Time elapsed: 1.886 s <<< ERROR! > Jul 02 01:23:50 java.lang.ArrayIndexOutOfBoundsException: 500 > Jul 02 01:23:50 at > org.apache.flink.table.data.columnar.vector.heap.AbstractHeapVector.setNullAt(AbstractHeapVector.java:72) > Jul 02 01:23:50 at > org.apache.flink.formats.parquet.vector.reader.NestedColumnReader.setFieldNullFalg(NestedColumnReader.java:251) > Jul 02 01:23:50 at > org.apache.flink.formats.parquet.vector.reader.NestedColumnReader.readArray(NestedColumnReader.java:221) > Jul 02 01:23:50 at > org.apache.flink.formats.parquet.vector.reader.NestedColumnReader.readData(NestedColumnReader.java:101) > Jul 02 01:23:50 at > org.apache.flink.formats.parquet.vector.reader.NestedColumnReader.readToVector(NestedColumnReader.java:90) > Jul 02 01:23:50 at > org.apache.flink.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.nextBatch(ParquetVectorizedInputFormat.java:413) > Jul 02 01:23:50 at > org.apache.flink.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.readBatch(ParquetVectorizedInputFormat.java:381) > Jul 02 01:23:50 at > org.apache.flink.connector.file.src.util.Utils.forEachRemaining(Utils.java:81) > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60587&view=logs&j=1c002d28-a73d-5309-26ee-10036d8476b4&t=d1c117a6-8f13-5466-55f0-d48dbb767fcd&l=12002 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [BP-1.17][FLINK-35750][runtime/metrics] Fix that the latency marker metrics aren't updated after failover. [flink]
RocMarshal commented on PR #25037: URL: https://github.com/apache/flink/pull/25037#issuecomment-2213100328 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.18][FLINK-35750][runtime/metrics] Fix that the latency marker metrics aren't updated after failover. [flink]
RocMarshal commented on PR #25038: URL: https://github.com/apache/flink/pull/25038#issuecomment-2213100224 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.19][FLINK-35731][runtime] Fix incorrect parallelism configured detection for Sink V2. [flink]
zhuzhurk merged PR #25024: URL: https://github.com/apache/flink/pull/25024 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-35731) Sink V2 operator is mistakenly assumed always to be parallelism configured
[ https://issues.apache.org/jira/browse/FLINK-35731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu closed FLINK-35731. --- Fix Version/s: 1.20.0 1.19.2 Resolution: Fixed 1.20: 2b3a47d98bf06ffde3d6d7c850414cc07a47d3f2 1.19: 6caf576d582bf3b2fcb9c6ef71f46115c58ea59c > Sink V2 operator is mistakenly assumed always to be parallelism configured > -- > > Key: FLINK-35731 > URL: https://issues.apache.org/jira/browse/FLINK-35731 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.17.2, 1.18.1, 1.20.0, 1.19.1 >Reporter: Junrui Li >Assignee: Junrui Li >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0, 1.19.2 > > > Currently, the Sink V2 operator is always marked as parallelism configured, > which prevents parallelism from being inferred. This can cause confusion for > users utilizing the Adaptive Batch scheduler. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-35781) Make pipeline parallelism config optional
[ https://issues.apache.org/jira/browse/FLINK-35781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruan Hang reassigned FLINK-35781: - Assignee: yux > Make pipeline parallelism config optional > - > > Key: FLINK-35781 > URL: https://issues.apache.org/jira/browse/FLINK-35781 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: yux >Assignee: yux >Priority: Major > Labels: pull-request-available > > Currently, Flink CDC `PIPELINE_PARALLELISM` option is forcefully required in > pipeline definition, which turns out to be unnecessary since Flink already > has a fallback parallelism option. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35713][cdc-compose] Add sink PARALLELISM for flink-cdc. [flink-cdc]
yuxiqian commented on code in PR #3438: URL: https://github.com/apache/flink-cdc/pull/3438#discussion_r1668075424 ## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java: ## Review Comment: In the future, defining multiple sink in one single pipeline job might be supported, and one might want to define parallelisms for each sink individually. Will it be better if we put `sink.parallelism` as an option in `sink:` block instead of a global pipeline option? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35713) Add sink PARALLELISM for flink-cdc
[ https://issues.apache.org/jira/browse/FLINK-35713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35713: --- Labels: pull-request-available (was: ) > Add sink PARALLELISM for flink-cdc > -- > > Key: FLINK-35713 > URL: https://issues.apache.org/jira/browse/FLINK-35713 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.2.0 >Reporter: wuzexian >Priority: Major > Labels: pull-request-available > Fix For: cdc-3.2.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34572] Support OceanBase Jdbc Catalog [flink-connector-jdbc]
whhe commented on code in PR #109: URL: https://github.com/apache/flink-connector-jdbc/pull/109#discussion_r1668077202 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java: ## @@ -82,6 +84,14 @@ public static AbstractJdbcCatalog createCatalog( } else if (dialect instanceof MySqlDialect) { return new MySqlCatalog( userClassLoader, catalogName, defaultDatabase, baseUrl, connectionProperties); +} else if (dialect instanceof OceanBaseDialect) { Review Comment: It seems better, will hava a try. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35781][cli] Provide a default parallelism (1) for pipeline jobs [flink-cdc]
ruanhang1993 commented on code in PR #3458: URL: https://github.com/apache/flink-cdc/pull/3458#discussion_r1668081572 ## docs/content.zh/docs/core-concept/data-pipeline.md: ## @@ -98,5 +98,5 @@ The following config options of Data Pipeline level are supported: | parameter | meaning | optional/required | |-|-|---| | name| The name of the pipeline, which will be submitted to the Flink cluster as the job name. | optional | -| parallelism | The global parallelism of the pipeline. | required | Review Comment: Please add the default value to this table. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35781][cli] Provide a default parallelism (1) for pipeline jobs [flink-cdc]
ruanhang1993 commented on code in PR #3458: URL: https://github.com/apache/flink-cdc/pull/3458#discussion_r1668081572 ## docs/content.zh/docs/core-concept/data-pipeline.md: ## @@ -98,5 +98,5 @@ The following config options of Data Pipeline level are supported: | parameter | meaning | optional/required | |-|-|---| | name| The name of the pipeline, which will be submitted to the Flink cluster as the job name. | optional | -| parallelism | The global parallelism of the pipeline. | required | Review Comment: Please add the default value to its description. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34572] Support OceanBase Jdbc Catalog [flink-connector-jdbc]
whhe commented on code in PR #109: URL: https://github.com/apache/flink-connector-jdbc/pull/109#discussion_r1668082863 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/catalog/OceanBaseTypeMapper.java: ## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.databases.oceanbase.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.dialect.JdbcDialectTypeMapper; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.DecimalType; + +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Types; + +/** OceanBaseTypeMapper util class. */ +@Internal +public class OceanBaseTypeMapper implements JdbcDialectTypeMapper { + +private static final int RAW_TIME_LENGTH = 10; +private static final int RAW_TIMESTAMP_LENGTH = 19; + +private static final int TYPE_BINARY_FLOAT = 100; +private static final int TYPE_BINARY_DOUBLE = 101; + +private final String compatibleMode; + +public OceanBaseTypeMapper(String compatibleMode) { +this.compatibleMode = compatibleMode; +} + +@Override +public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex) +throws SQLException { +String typeName = metadata.getColumnTypeName(colIndex).toUpperCase(); +int jdbcType = metadata.getColumnType(colIndex); +String columnName = metadata.getColumnName(colIndex); +int precision = metadata.getPrecision(colIndex); +int scale = metadata.getScale(colIndex); +switch (jdbcType) { +case Types.BIT: +return DataTypes.BOOLEAN(); +case Types.TINYINT: +if (isUnsignedType(typeName) || precision > 4) { Review Comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35665) Release Testing: FLIP-441: Show the JobType and remove Execution Mode on Flink WebUI
[ https://issues.apache.org/jira/browse/FLINK-35665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zakelly Lan updated FLINK-35665: Attachment: image-2024-07-08-14-51-11-241.png > Release Testing: FLIP-441: Show the JobType and remove Execution Mode on > Flink WebUI > -- > > Key: FLINK-35665 > URL: https://issues.apache.org/jira/browse/FLINK-35665 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Web Frontend >Reporter: Rui Fan >Assignee: Zakelly Lan >Priority: Blocker > Fix For: 1.20.0 > > Attachments: image-2024-06-21-15-51-53-480.png, > image-2024-07-08-14-51-11-241.png, image-2024-07-08-14-51-35-796.png > > > Test suggestion: > > 1. Using this following job to check the jobType > {code:java} > import org.apache.flink.api.common.RuntimeExecutionMode; > import org.apache.flink.api.common.eventtime.WatermarkStrategy; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.common.typeinfo.Types; > import > org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.configuration.RestOptions; > import org.apache.flink.connector.datagen.source.DataGeneratorSource; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > /** Test for showing job type in Flink WebUI. */ > public class JobTypeDemo { > public static void main(String[] args) throws Exception { > Configuration conf = new Configuration(); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(conf); > env.setRuntimeMode(RuntimeExecutionMode.STREAMING); > // env.setRuntimeMode(RuntimeExecutionMode.BATCH); > // env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); > DataGeneratorSource generatorSource = > new DataGeneratorSource<>( > value -> value, > 600, > RateLimiterStrategy.perSecond(10), > Types.LONG); > env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), > "Data Generator") > .map((MapFunction) value -> value) > .name("Map___1") > .print(); > env.execute(JobTypeDemo.class.getSimpleName()); > } > } {code} > 2. Start it and check if the jobType is Streaming in Flink web UI. > !image-2024-06-21-15-51-53-480.png|width=1835,height=768! > 3. Applying the env.setRuntimeMode(RuntimeExecutionMode.BATCH);, and check if > the jobType is Batch in Flink web UI. > 4. Applying the env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);, and > check if the jobType is Batch in Flink web UI. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35665) Release Testing: FLIP-441: Show the JobType and remove Execution Mode on Flink WebUI
[ https://issues.apache.org/jira/browse/FLINK-35665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zakelly Lan updated FLINK-35665: Attachment: image-2024-07-08-14-51-35-796.png > Release Testing: FLIP-441: Show the JobType and remove Execution Mode on > Flink WebUI > -- > > Key: FLINK-35665 > URL: https://issues.apache.org/jira/browse/FLINK-35665 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Web Frontend >Reporter: Rui Fan >Assignee: Zakelly Lan >Priority: Blocker > Fix For: 1.20.0 > > Attachments: image-2024-06-21-15-51-53-480.png, > image-2024-07-08-14-51-11-241.png, image-2024-07-08-14-51-35-796.png > > > Test suggestion: > > 1. Using this following job to check the jobType > {code:java} > import org.apache.flink.api.common.RuntimeExecutionMode; > import org.apache.flink.api.common.eventtime.WatermarkStrategy; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.common.typeinfo.Types; > import > org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.configuration.RestOptions; > import org.apache.flink.connector.datagen.source.DataGeneratorSource; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > /** Test for showing job type in Flink WebUI. */ > public class JobTypeDemo { > public static void main(String[] args) throws Exception { > Configuration conf = new Configuration(); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(conf); > env.setRuntimeMode(RuntimeExecutionMode.STREAMING); > // env.setRuntimeMode(RuntimeExecutionMode.BATCH); > // env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); > DataGeneratorSource generatorSource = > new DataGeneratorSource<>( > value -> value, > 600, > RateLimiterStrategy.perSecond(10), > Types.LONG); > env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), > "Data Generator") > .map((MapFunction) value -> value) > .name("Map___1") > .print(); > env.execute(JobTypeDemo.class.getSimpleName()); > } > } {code} > 2. Start it and check if the jobType is Streaming in Flink web UI. > !image-2024-06-21-15-51-53-480.png|width=1835,height=768! > 3. Applying the env.setRuntimeMode(RuntimeExecutionMode.BATCH);, and check if > the jobType is Batch in Flink web UI. > 4. Applying the env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);, and > check if the jobType is Batch in Flink web UI. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35665) Release Testing: FLIP-441: Show the JobType and remove Execution Mode on Flink WebUI
[ https://issues.apache.org/jira/browse/FLINK-35665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zakelly Lan updated FLINK-35665: Attachment: image-2024-07-08-14-52-32-778.png > Release Testing: FLIP-441: Show the JobType and remove Execution Mode on > Flink WebUI > -- > > Key: FLINK-35665 > URL: https://issues.apache.org/jira/browse/FLINK-35665 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Web Frontend >Reporter: Rui Fan >Assignee: Zakelly Lan >Priority: Blocker > Fix For: 1.20.0 > > Attachments: image-2024-06-21-15-51-53-480.png, > image-2024-07-08-14-51-11-241.png, image-2024-07-08-14-51-35-796.png, > image-2024-07-08-14-52-32-778.png > > > Test suggestion: > > 1. Using this following job to check the jobType > {code:java} > import org.apache.flink.api.common.RuntimeExecutionMode; > import org.apache.flink.api.common.eventtime.WatermarkStrategy; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.common.typeinfo.Types; > import > org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.configuration.RestOptions; > import org.apache.flink.connector.datagen.source.DataGeneratorSource; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > /** Test for showing job type in Flink WebUI. */ > public class JobTypeDemo { > public static void main(String[] args) throws Exception { > Configuration conf = new Configuration(); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(conf); > env.setRuntimeMode(RuntimeExecutionMode.STREAMING); > // env.setRuntimeMode(RuntimeExecutionMode.BATCH); > // env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); > DataGeneratorSource generatorSource = > new DataGeneratorSource<>( > value -> value, > 600, > RateLimiterStrategy.perSecond(10), > Types.LONG); > env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), > "Data Generator") > .map((MapFunction) value -> value) > .name("Map___1") > .print(); > env.execute(JobTypeDemo.class.getSimpleName()); > } > } {code} > 2. Start it and check if the jobType is Streaming in Flink web UI. > !image-2024-06-21-15-51-53-480.png|width=1835,height=768! > 3. Applying the env.setRuntimeMode(RuntimeExecutionMode.BATCH);, and check if > the jobType is Batch in Flink web UI. > 4. Applying the env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);, and > check if the jobType is Batch in Flink web UI. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35713][cdc-compose] Add sink PARALLELISM for flink-cdc. [flink-cdc]
proletarians commented on code in PR #3438: URL: https://github.com/apache/flink-cdc/pull/3438#discussion_r1668085508 ## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java: ## Review Comment: Got it! I will follow up on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35665) Release Testing: FLIP-441: Show the JobType and remove Execution Mode on Flink WebUI
[ https://issues.apache.org/jira/browse/FLINK-35665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863677#comment-17863677 ] Zakelly Lan commented on FLINK-35665: - Verified via: # Download flink-1.20-rc0 from [https://dist.apache.org/repos/dist/dev/flink/flink-1.20.0-rc0/] # Compile the JobTypeDemo as Rui provided. # Submit a job on yarn in per-job mode, via flink cli. # Open the Flink UI and check the Job Type. When applying {{env.setRuntimeMode(RuntimeExecutionMode.STREAMING)}} it shows: !image-2024-07-08-14-52-32-778.png! When applying {{env.setRuntimeMode(RuntimeExecutionMode.BATCH)}} it shows: !image-2024-07-08-14-51-11-241.png! When applying {{env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC)}} it shows: !image-2024-07-08-14-51-35-796.png! So it seems work as expected. > Release Testing: FLIP-441: Show the JobType and remove Execution Mode on > Flink WebUI > -- > > Key: FLINK-35665 > URL: https://issues.apache.org/jira/browse/FLINK-35665 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Web Frontend >Reporter: Rui Fan >Assignee: Zakelly Lan >Priority: Blocker > Fix For: 1.20.0 > > Attachments: image-2024-06-21-15-51-53-480.png, > image-2024-07-08-14-51-11-241.png, image-2024-07-08-14-51-35-796.png, > image-2024-07-08-14-52-32-778.png > > > Test suggestion: > > 1. Using this following job to check the jobType > {code:java} > import org.apache.flink.api.common.RuntimeExecutionMode; > import org.apache.flink.api.common.eventtime.WatermarkStrategy; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.common.typeinfo.Types; > import > org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.configuration.RestOptions; > import org.apache.flink.connector.datagen.source.DataGeneratorSource; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > /** Test for showing job type in Flink WebUI. */ > public class JobTypeDemo { > public static void main(String[] args) throws Exception { > Configuration conf = new Configuration(); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(conf); > env.setRuntimeMode(RuntimeExecutionMode.STREAMING); > // env.setRuntimeMode(RuntimeExecutionMode.BATCH); > // env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); > DataGeneratorSource generatorSource = > new DataGeneratorSource<>( > value -> value, > 600, > RateLimiterStrategy.perSecond(10), > Types.LONG); > env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), > "Data Generator") > .map((MapFunction) value -> value) > .name("Map___1") > .print(); > env.execute(JobTypeDemo.class.getSimpleName()); > } > } {code} > 2. Start it and check if the jobType is Streaming in Flink web UI. > !image-2024-06-21-15-51-53-480.png|width=1835,height=768! > 3. Applying the env.setRuntimeMode(RuntimeExecutionMode.BATCH);, and check if > the jobType is Batch in Flink web UI. > 4. Applying the env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);, and > check if the jobType is Batch in Flink web UI. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-35665) Release Testing: FLIP-441: Show the JobType and remove Execution Mode on Flink WebUI
[ https://issues.apache.org/jira/browse/FLINK-35665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zakelly Lan resolved FLINK-35665. - Resolution: Fixed > Release Testing: FLIP-441: Show the JobType and remove Execution Mode on > Flink WebUI > -- > > Key: FLINK-35665 > URL: https://issues.apache.org/jira/browse/FLINK-35665 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Web Frontend >Reporter: Rui Fan >Assignee: Zakelly Lan >Priority: Blocker > Fix For: 1.20.0 > > Attachments: image-2024-06-21-15-51-53-480.png, > image-2024-07-08-14-51-11-241.png, image-2024-07-08-14-51-35-796.png, > image-2024-07-08-14-52-32-778.png > > > Test suggestion: > > 1. Using this following job to check the jobType > {code:java} > import org.apache.flink.api.common.RuntimeExecutionMode; > import org.apache.flink.api.common.eventtime.WatermarkStrategy; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.common.typeinfo.Types; > import > org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.configuration.RestOptions; > import org.apache.flink.connector.datagen.source.DataGeneratorSource; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > /** Test for showing job type in Flink WebUI. */ > public class JobTypeDemo { > public static void main(String[] args) throws Exception { > Configuration conf = new Configuration(); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(conf); > env.setRuntimeMode(RuntimeExecutionMode.STREAMING); > // env.setRuntimeMode(RuntimeExecutionMode.BATCH); > // env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); > DataGeneratorSource generatorSource = > new DataGeneratorSource<>( > value -> value, > 600, > RateLimiterStrategy.perSecond(10), > Types.LONG); > env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), > "Data Generator") > .map((MapFunction) value -> value) > .name("Map___1") > .print(); > env.execute(JobTypeDemo.class.getSimpleName()); > } > } {code} > 2. Start it and check if the jobType is Streaming in Flink web UI. > !image-2024-06-21-15-51-53-480.png|width=1835,height=768! > 3. Applying the env.setRuntimeMode(RuntimeExecutionMode.BATCH);, and check if > the jobType is Batch in Flink web UI. > 4. Applying the env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);, and > check if the jobType is Batch in Flink web UI. -- This message was sent by Atlassian Jira (v8.20.10#820010)