I was talking about Flink Kubernetes operator and HA , not the individual
Flink jobs. But based on your answer it’s probably not the cause

Gyula

On Fri, 26 Apr 2024 at 21:15, Maxim Senin <mse...@cogility.com> wrote:

> Hi, Gyula. Thanks for the tips.
>
> All jobs are deployed in a single namespace, “flink”.
>
> Which replicas? The JM replicas are already 1, I tried with TM replicas
> set to 1, but same exception happens. We have only 1 instance of the
> operator (replicas=1) in this environment.
>
> The only workarounds I discovered is either
> a) disable autoscaling for the failing job (autoscaler scales the job to
> zero for “gracefully” stopping it and then never starts it)  or
> b) some jobs that keep restarting can be fixed by disabling HA for that job
>
> And ` *Cannot rescale the given pointwise partitioner.` *is also still a
> mystery.
>
> *Thanks,*
>
> *Maxim*
>
>
>
> *From: *Gyula Fóra <gyula.f...@gmail.com>
> *Date: *Friday, April 26, 2024 at 1:10 AM
> *To: *Maxim Senin <mse...@cogility.com>
> *Cc: *Maxim Senin via user <user@flink.apache.org>
> *Subject: *Re: [External] Exception during autoscaling operation - Flink
> 1.18/Operator 1.8.0
>
> Hi Maxim!
>
>
>
> Regarding the status update error, it could be related to a problem that
> we have discovered recently with the Flink Operator HA. Where during a
> namespace change both leader and follower instances would start processing.
>
> It has been fixed in the current master by updating the JOSDK version to
> the one containing the fix.
>
>
>
> For details you can check:
>
> https://github.com/operator-framework/java-operator-sdk/issues/2341
>
>
> https://github.com/apache/flink-kubernetes-operator/commit/29a4aa5adf101920cbe1a3a9a178ff16f52c746d
>
>
>
> To resolve the issue (if it's caused by this), you could either
> cherry-pick the fix internally to the operator or reduce the replicas to 1
> if you are using HA.
>
>
>
> Cheers,
>
> Gyula
>
>
>
>
>
>
>
> On Fri, Apr 26, 2024 at 3:48 AM Maxim Senin via user <
> user@flink.apache.org> wrote:
>
> I have also seen this exception:
>
> o.a.f.k.o.o.JobStatusObserver  
> *[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6]
> Job d0ac9da5959d8cc9a82645eeef6751a5 failed with error:
> java.util.concurrent.CompletionException:
> java.util.concurrent.CompletionException:
> java.lang.UnsupportedOperationException: Cannot rescale the given pointwise
> partitioner.*
>
> *Did you change the partitioner to forward or rescale?*
>
> *It may also help to add an explicit shuffle().*
>
> *                at
> org.apache.flink.util.concurrent.FutureUtils.lambda$switchExecutor$23(FutureUtils.java:1305)*
>
> *                at
> java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)*
>
> *                at
> java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown
> Source)*
>
> *                at
> java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown
> Source)*
>
> *                at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)*
>
> *                at
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)*
>
> *                at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)*
>
> *                at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)*
>
> *                at
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)*
>
> *                at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)*
>
> *                at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)*
>
> *                at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)*
>
> *                at
> scala.PartialFunction.applyOrElse(PartialFunction.scala:127)*
>
> *                at
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)*
>
> *                at
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)*
>
> I can’t find any information on how to interpret this. Please advise..
>
> Cheers,
> Maxim
>
> *From: *Maxim Senin via user <user@flink.apache.org>
> *Date: *Thursday, April 25, 2024 at 12:01 PM
> *To: *Maxim Senin via user <user@flink.apache.org>
> *Subject: *[External] Exception during autoscaling operation - Flink
> 1.18/Operator 1.8.0
>
> Hi.
>
> I already asked before but never got an answer. My observation is that the
> operator, after collecting some stats, is trying to restart one of the
> deployments. This includes taking a savepoint (`takeSavepointOnUpgrade:
> true`, `upgradeMode: savepoint`) and “gracefully” shutting down the
> JobManager by “scaling it to zero” (by setting replicas = 0 in the new
> generated config).
>
> However, the deployment never comes back up, apparently, due to exception:
>
>
> 2024-04-25 17:20:52,920 mi.j.o.p.e.ReconciliationDispatcher *[ERROR]*
> [flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] *Error* during error
> status handling.
>
> org.apache.flink.kubernetes.operator.exception.StatusConflictException:
> Status have been modified externally in version 50607043 Previous:
> {"jobStatus":{"jobName":"autoscaling
> test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHEDINFO\\n\"},\"mode\":\"native\"},\"resource_metadata\":…
>
> *    at
> org.apache.flink.kubernetes.operator.utils.StatusRecorder.replaceStatus(StatusRecorder.java:161)*
>
> *    at
> org.apache.flink.kubernetes.operator.utils.StatusRecorder.patchAndCacheStatus(StatusRecorder.java:97)*
>
> *    at
> org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils.toErrorStatusUpdateControl(ReconciliationUtils.java:438)*
>
> *    at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:209)*
>
> *    at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:57)*
>
> *    at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleErrorStatusHandler(ReconciliationDispatcher.java:194)*
>
> *    at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:123)*
>
> *    at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:91)*
>
> *    at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:64)*
>
> *    at
> io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:417)*
>
> *    at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)*
>
> *    at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source)*
>
> *    at java.base/java.lang.Thread.run(Unknown Source)*
>
> 2024-04-25 17:20:52,925 mi.j.o.p.e.ReconciliationDispatcher *[ERROR]*
> [flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] *Error* during event
> processing ExecutionScope{ resource id: ResourceID{name=
> 'f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6', namespace='flink'}, version:
> 50606957} failed.
>
> org.apache.flink.kubernetes.operator.exception.ReconciliationException:
> org.apache.flink.kubernetes.operator.exception.StatusConflictException:
> Status have been modified externally in version 50607043 Previous:
> {"jobStatus":{"jobName":"autoscaling
> test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHED",
>
>
> Caused by:
> org.apache.flink.kubernetes.operator.exception.StatusConflictException:
> Status have been modified externally in version 50607043 Previous:
> {"jobStatus":{"jobName":"autoscaling
> test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHED
>
> *    at
> org.apache.flink.kubernetes.operator.utils.StatusRecorder.replaceStatus(StatusRecorder.java:161)*
>
> *    at
> org.apache.flink.kubernetes.operator.utils.StatusRecorder.patchAndCacheStatus(StatusRecorder.java:97)*
>
> *    at
> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:175)*
>
> *    at
> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:63)*
>
> *    at
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.restoreJob(AbstractJobReconciler.java:279)*
>
> *    at
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:156)*
>
> *    at
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:171)*
>
> *    at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:145)*
>
>     ... 13 more
>
>
> How to fix this? Why is the deployment not coming back up after this
> exception? Is there an configuration property to set a number of retires?
>
> Thanks,
> Maxim
>
>
> ------------------------------
>
>
> COGILITY SOFTWARE CORPORATION LEGAL DISCLAIMER: The information in this
> email is confidential and is intended solely for the addressee. Access to
> this email by anyone else is unauthorized. If you are not the intended
> recipient, any disclosure, copying, distribution or any action taken or
> omitted to be taken in reliance on it, is prohibited and may be unlawful.
>
>

Reply via email to