[ 
https://issues.apache.org/jira/browse/FLINK-37320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luca Castelli updated FLINK-37320:
----------------------------------
    Description: 
Hello,

I believe I've found bugs within the observation logic for both finite 
streaming and batch jobs. This is a follow-up to: 
[https://lists.apache.org/thread/xvsk4fmlqln092cdolvox4dgko0pw81k].

*For finite streaming jobs:*
 # The job finishes successfully and the job status changes to FINISHED
 # TTL (kubernetes.operator.jm-deployment.shutdown-ttl) cleanup removes the JM 
deployments and clears HA configmap data
 # On the next loop, the observer sees MISSING JM and changes the job status 
from FINISHED to RECONCILING

The job had reached a terminal state. It shouldn't have been set back to 
RECONCILING.

This leads to an operator error later when a recovery attempt is triggered. The 
recovery is triggered because the JM is MISSING, the status is RECONCILING, 
spec shows RUNNING, and HA enabled. The recovery fails with 
validateHaMetadataExists throwing UpgradeFailureException.

At that point the deployment gets stuck in a loop with status RECONCILING and 
UpgradeFailureException thrown on each cycle. I've attached operator logs 
showing this.

*Proposed solution:* I think the fix would be to wrap 
[AbstractFlinkDeploymentObserver.observeJmDeployment|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java#L155]
 in an if-statement that checks the job is not in a terminal state. Happy to 
discuss and/or put up the 2 line code change PR.

 

*For batch jobs:*

The root error (full logs attached):
{code:java}
Caused by: org.apache.flink.runtime.rest.util.RestClientException: 
[org.apache.flink.runtime.rest.handler.RestHandlerException: Checkpointing has 
not been enabled.
    at 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.createCheckpointingStatistics(CheckpointingStatisticsHandler.java:107){code}
 # The job finishes successfully and the job status changes to FINISHED in 
memory
 # When observing the FlinkCluster, in 
[AbstractFlinkService.getCheckpointInfo|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L570],
 the operator successfully connects to the Flink REST API
 # It tries to get checkpoint statistics via CheckpointingStatisticsHandler, 
but the handler throws an error because checkpointing is not enabled. This is 
actually an expected response since it's a BATCH job

E.g. 
[http://localhost:8081/jobs/job-id/checkpoints|http://localhost:8081/jobs/d12843a2a326d5268e550012e620cdca/checkpoints]

{code:java}
{ "errors": [ "org.apache.flink.runtime.rest.handler.RestHandlerException: 
Checkpointing has not been enabled.\n\tat 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.createCheckpointingStatistics(CheckpointingStatisticsHandler.java:107)\n\tat
 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleCheckpointStatsRequest(CheckpointingStatisticsHandler.java:85)\n\tat
 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleCheckpointStatsRequest(CheckpointingStatisticsHandler.java:59)\n\tat
 
org.apache.flink.runtime.rest.handler.job.checkpoints.AbstractCheckpointStatsHandler.lambda$handleRequest$1(AbstractCheckpointStatsHandler.java:89)\n\tat
 
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)\n\tat
 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)\n\tat
 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat
 java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)\n\tat
 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat
 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat
 java.base/java.lang.Thread.run(Thread.java:829)\n" ] }{code}
     4. The REST endpoint exists but returns the exception which then bubbles 
up to a ReconciliationException and continues being thrown in a loop
     5. The deployment is never cleaned up and never shows FINISHED in the CR

*Proposed solution:* I think we should check in 
[SnapshotObserver.observeSavepointStatus|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java#L80]
 if the job is a BATCH job. If it is, return early without observing anything. 
Also happy to discuss and/or put up a PR.

  was:
Hello,

I believe I've found bugs within the observation logic for both finite 
streaming and batch jobs. This is a follow-up to: 
[https://lists.apache.org/thread/xvsk4fmlqln092cdolvox4dgko0pw81k].

*For finite streaming jobs:*
 # The job finishes successfully and the job status changes to FINISHED
 # TTL (kubernetes.operator.jm-deployment.shutdown-ttl) cleanup removes the JM 
deployments and clears HA configmap data
 # On the next loop, the observer sees MISSING JM and changes the job status 
from FINISHED to RECONCILING

The job had reached a terminal state. It shouldn't have been set back to 
RECONCILING.

This leads to an operator error later when a recovery attempt is triggered. The 
recovery is triggered because the JM is MISSING, the status is RECONCILING, 
spec shows RUNNING, and HA enabled. The recovery fails with 
validateHaMetadataExists throwing UpgradeFailureException.

At that point the deployment gets stuck in a loop with status RECONCILING and 
UpgradeFailureException thrown on each cycle. I've attached operator logs 
showing this.

I think the fix would be to wrap 
[AbstractFlinkDeploymentObserver.observeJmDeployment|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java#L155]
 in an if-statement that checks the job is not in a terminal state. Happy to 
discuss and/or put up the 2 line code change PR.

*For batch jobs:*

The root error:
{code:java}
Caused by: org.apache.flink.runtime.rest.util.RestClientException: 
[org.apache.flink.runtime.rest.handler.RestHandlerException: Checkpointing has 
not been enabled.
    at 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.createCheckpointingStatistics(CheckpointingStatisticsHandler.java:107){code}
 # In 
[AbstractFlinkService.getCheckpointInfo|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L570],
 the operator successfully connects to the Flink REST API
 # It tries to get checkpoint statistics via CheckpointingStatisticsHandler
 # The handler throws an error because checkpointing is not enabled (since it's 
a BATCH job)

[http://localhost:8081/jobs/job-id/checkpoints|http://localhost:8081/jobs/d12843a2a326d5268e550012e620cdca/checkpoints]

Returns
{code:java}
{ "errors": [ "org.apache.flink.runtime.rest.handler.RestHandlerException: 
Checkpointing has not been enabled.\n\tat 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.createCheckpointingStatistics(CheckpointingStatisticsHandler.java:107)\n\tat
 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleCheckpointStatsRequest(CheckpointingStatisticsHandler.java:85)\n\tat
 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleCheckpointStatsRequest(CheckpointingStatisticsHandler.java:59)\n\tat
 
org.apache.flink.runtime.rest.handler.job.checkpoints.AbstractCheckpointStatsHandler.lambda$handleRequest$1(AbstractCheckpointStatsHandler.java:89)\n\tat
 
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)\n\tat
 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)\n\tat
 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat
 java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)\n\tat
 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat
 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat
 java.base/java.lang.Thread.run(Thread.java:829)\n" ] }{code}

 # The REST endpoint exists but returns the exception which bubbles up to a 
ReconciliationException

 


> FINISHED jobs incorrectly being set to RECONCILING
> --------------------------------------------------
>
>                 Key: FLINK-37320
>                 URL: https://issues.apache.org/jira/browse/FLINK-37320
>             Project: Flink
>          Issue Type: Bug
>          Components: Kubernetes Operator
>    Affects Versions: kubernetes-operator-1.10.0
>         Environment: I've attached the flinkdeployment CR and operator-config 
> I used to locally replicate.
>            Reporter: Luca Castelli
>            Priority: Minor
>         Attachments: operator-config.yaml, operator-log-batch-job.log, 
> operator-log-finite-streaming-job.log, test-batch-job.yaml, 
> test-finite-streaming-job.yaml
>
>
> Hello,
> I believe I've found bugs within the observation logic for both finite 
> streaming and batch jobs. This is a follow-up to: 
> [https://lists.apache.org/thread/xvsk4fmlqln092cdolvox4dgko0pw81k].
> *For finite streaming jobs:*
>  # The job finishes successfully and the job status changes to FINISHED
>  # TTL (kubernetes.operator.jm-deployment.shutdown-ttl) cleanup removes the 
> JM deployments and clears HA configmap data
>  # On the next loop, the observer sees MISSING JM and changes the job status 
> from FINISHED to RECONCILING
> The job had reached a terminal state. It shouldn't have been set back to 
> RECONCILING.
> This leads to an operator error later when a recovery attempt is triggered. 
> The recovery is triggered because the JM is MISSING, the status is 
> RECONCILING, spec shows RUNNING, and HA enabled. The recovery fails with 
> validateHaMetadataExists throwing UpgradeFailureException.
> At that point the deployment gets stuck in a loop with status RECONCILING and 
> UpgradeFailureException thrown on each cycle. I've attached operator logs 
> showing this.
> *Proposed solution:* I think the fix would be to wrap 
> [AbstractFlinkDeploymentObserver.observeJmDeployment|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java#L155]
>  in an if-statement that checks the job is not in a terminal state. Happy to 
> discuss and/or put up the 2 line code change PR.
>  
> *For batch jobs:*
> The root error (full logs attached):
> {code:java}
> Caused by: org.apache.flink.runtime.rest.util.RestClientException: 
> [org.apache.flink.runtime.rest.handler.RestHandlerException: Checkpointing 
> has not been enabled.
>     at 
> org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.createCheckpointingStatistics(CheckpointingStatisticsHandler.java:107){code}
>  # The job finishes successfully and the job status changes to FINISHED in 
> memory
>  # When observing the FlinkCluster, in 
> [AbstractFlinkService.getCheckpointInfo|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L570],
>  the operator successfully connects to the Flink REST API
>  # It tries to get checkpoint statistics via CheckpointingStatisticsHandler, 
> but the handler throws an error because checkpointing is not enabled. This is 
> actually an expected response since it's a BATCH job
> E.g. 
> [http://localhost:8081/jobs/job-id/checkpoints|http://localhost:8081/jobs/d12843a2a326d5268e550012e620cdca/checkpoints]
> {code:java}
> { "errors": [ "org.apache.flink.runtime.rest.handler.RestHandlerException: 
> Checkpointing has not been enabled.\n\tat 
> org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.createCheckpointingStatistics(CheckpointingStatisticsHandler.java:107)\n\tat
>  
> org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleCheckpointStatsRequest(CheckpointingStatisticsHandler.java:85)\n\tat
>  
> org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleCheckpointStatsRequest(CheckpointingStatisticsHandler.java:59)\n\tat
>  
> org.apache.flink.runtime.rest.handler.job.checkpoints.AbstractCheckpointStatsHandler.lambda$handleRequest$1(AbstractCheckpointStatsHandler.java:89)\n\tat
>  
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)\n\tat
>  
> java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)\n\tat
>  
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat
>  java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)\n\tat
>  
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat
>  
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat
>  java.base/java.lang.Thread.run(Thread.java:829)\n" ] }{code}
>      4. The REST endpoint exists but returns the exception which then bubbles 
> up to a ReconciliationException and continues being thrown in a loop
>      5. The deployment is never cleaned up and never shows FINISHED in the CR
> *Proposed solution:* I think we should check in 
> [SnapshotObserver.observeSavepointStatus|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java#L80]
>  if the job is a BATCH job. If it is, return early without observing 
> anything. Also happy to discuss and/or put up a PR.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to