Hi Flink Kubernetes Operator Community,

I hope this is the right way to report an issue with the Apache Flink 
Kubernetes Operator. We are experiencing problems with some streaming job 
clusters which end up in a terminated state, because of the operator not 
behaving as expected. The problem is that the teardown of the Flink cluster by 
the operator doesn't succeed in the default timeout of 1 minute. After that the 
operator proceeds and tries to create a fresh cluster, which fails, because 
parts of the cluster still exist. After that it tries to fully remove the 
cluster including the HA metadata. After that it is stuck in an error loop that 
manual recovery is necessary, since the HA metadata is missing. At the very 
bottom of the mail you can find an condensed log attached, which hopefully 
gives a more detailed impression about the problem.

The current workaround is to increase the 
"kubernetes.operator.resource.cleanup.timeout" [0] to 10 minutes. Time will 
tell whether this workaround fixes the problem for us.

The main problem I see is that the method 
AbstractFlinkService.waitForClusterShutdown(...) [1] isn't handling a timeout 
at all. Please correct me in case I missed a detail, but this is how we 
experience the problem. In case one of the service, the jobmanagers or the 
taskmanagers survives the cleanup timeout (of 1 minute), the operator seems to 
proceed as if the entities have been removed properly. To me this doesn't look 
good. From my point of view at least an error should be logged.

Additionally the current logging makes it difficult to analyse the problem and 
to be notified about the timeout. The following things could possibly be 
improved or implemented.
Successful removal of the entities should be logged.
Timing out isn't logged (An error should probably be logged here)
For some reason the logging of the waited seconds is somehow incomplete (L944, 
further analysis needed)
We use the following Flink and Operator versions:

Flink Image: flink:1.17.1 (from Dockerhub)
Operator Version: 1.6.1

I hope this description is well enough to get into touch and discuss the 
matter. I'm open to provide additional information or with some guidance, 
provide a patch to resolve the issue.
Thanks for your work on the Operator. It is highly appreciated!

Cheers,
Niklas


[0] 
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/
[1] 
https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L903


#############################################################################################
# The job in the cluster failed
Event  | Info    | JOBSTATUSCHANGED | Job status changed from RUNNING to FAILED"
Stopping failed Flink job...
Status | Error   | FAILED          | 
{""type"":""org.apache.flink.util.SerializedThrowable"",""message"":"
"org.apache.flink.runtime.JobException: Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=5, 
backoffTimeMS=30000)"",""additionalMetadata"":{},""throwableList"":[{""type"":""org.apache.flink.util.SerializedThrowable"",""message"":""org
.apache.flink.util.FlinkExpectedException: The TaskExecutor is shutting 
down."",""additionalMetadata"":{}}]}
 Deleting JobManager deployment while preserving HA metadata.
Deleting cluster with Foreground propagation
Waiting for cluster shutdown... (10s)
Waiting for cluster shutdown... (30s)
Waiting for cluster shutdown... (40s)
Waiting for cluster shutdown... (45s)
Waiting for cluster shutdown... (50s)
Resubmitting Flink job...
Cluster shutdown completed.
Deploying application cluster
Event  | Info    | SUBMIT          | Starting deployment
Submitting application in 'Application Mode
Deploying application cluster
...
Event  | Warning | CLUSTERDEPLOYMENTEXCEPTION | Could not create Kubernetes 
cluster ""<cluster>""
 Status | Error   | FAILED          | 
{""type"":""org.apache.flink.kubernetes.operator.exception.ReconciliationException"",""message"":""org.apache.flink.client.deployment.ClusterDeploymentException:
 Could not create Kubernetes cluster 
\""<cluster\""."",""additionalMetadata"":{},""throwableList"":[{""type"":""org.apache.flink.client.deployment.ClusterDeploymentException"",""message"":""Could
 not create Kubernetes cluster 
\""<cluster>\""."",""additionalMetadata"":{}},{""type"":""org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException"",""message"":""Failure
 executing: POST at: 
https://10.96.0.1/apis/apps/v1/namespaces/dapfs-basic/deployments. Message: 
object is being deleted: deployments.apps \""<cluster>\"" already exists. 
Received status: Status(apiVersion=v1, code=409, 
details=StatusDetails(causes=[], group=apps, kind=deployments, name=<cluster>, 
retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, 
message=object is being deleted: deployments.apps \""<cluster>\"" already 
exists, metadata=ListMeta(_continue=null, remainingItemCount=null, 
resourceVersion=null, selfLink=null, additionalProperties={}), 
reason=AlreadyExists, status=Failure, 
additionalProperties={})."",""additionalMetadata"":{}}]} "
...
Event  | Warning | RECOVERDEPLOYMENT | Recovering lost deployment"
Deploying application cluster requiring last-state from HA metadata
Event  | Info    | SUBMIT          | Starting deployment
Flink recovery failed
Event  | Warning | RESTOREFAILED   | HA metadata not available to restore from 
last state. It is possible that the job has finished or terminally failed, or 
the configmaps have been deleted. Manual restore required."
# Here the deadlock / endless loop starts
#############################################################################################

Attachment: smime.p7s
Description: S/MIME cryptographic signature

Reply via email to