【问题描述】

Flink k8s operator(v1.1.0)高可用部署了一个Flink Session Cluster(两个JobManager), 
然后用SessionJob 部署一个例子job,job有时可以部署,有时部署不了。

可以看到容器中如下error日志。



【操作步骤】

部署Cluster



apiVersion: flink.apache.org/v1beta1

kind: FlinkDeployment

metadata:

  name: flink-cluster-1jm-checkpoint

spec:

  image: flink:1.15

  flinkVersion: v1_15

  flinkConfiguration:

    taskmanager.numberOfTaskSlots: "1"

    state.savepoints.dir: 
file:///flink-data/savepoints<file://flink-data/savepoints>

    state.checkpoints.dir: 
file:///flink-data/checkpoints<file://flink-data/checkpoints>

    high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory

    high-availability.storageDir: file:///flink-data/ha<file://flink-data/ha>

    state.checkpoints.num-retained: "10"

  serviceAccount: flink

  ingress:

    template: "{{name}}.{{namespace}}.k8s.rf.io"

  jobManager:

    replicas: 2

  podTemplate:

    spec:

      nodeSelector:

        kubernetes.io/hostname: k8s17

      containers:

        - name: flink-main-container

          volumeMounts:

            - mountPath: /flink-data

              name: flink-volume

      volumes:

        - name: flink-volume

          hostPath:

            # directory location on host

            path: /tmp/flink

            # this field is optional

            type: Directory



部署job:



apiVersion: flink.apache.org/v1beta1

kind: FlinkSessionJob

metadata:

  name: flink-job-1jm-checkpoint

spec:

  deploymentName: flink-cluster-1jm-checkpoint

  job:

    jarURI: 
file:///opt/flink/examples/streaming/StateMachineExample.jar<file://opt/flink/examples/streaming/StateMachineExample.jar>
  # 自己打的operator镜像包含了examples的jar

    entryClass: 
org.apache.flink.streaming.examples.statemachine.StateMachineExample

    parallelism: 1

    upgradeMode: savepoint





【相关日志】

  1.  job部署成功可以运行的一次,operator日志:

2022-10-27 03:38:07,952 o.a.f.k.o.s.FlinkService 
[ERROR][flink/flink-job-1jm-checkpoint] Failed to delete the jar: 
06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar.

java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.rest.util.RestClientException: 
[org.apache.flink.runtime.rest.handler.RestHandlerException: File 
06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar does not exist in 
/tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload.

at 
org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler.lambda$handleRequest$0(JarDeleteHandler.java:80)

at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
Source)

at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)

at java.base/java.util.concurrent.FutureTask.run(Unknown Source)

at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
 Source)

at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

at java.base/java.lang.Thread.run(Unknown Source)

]

at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown Source

一个JobManager 
Pod中没有这个/tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload/06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar文件,而在另一个JM的Pod中,但这个JM应该不是Leader,因为看到刷出的checkpoint相关的日志在第一个JM中。





  1.  job部署失败operator日志:

2022-10-27 10:12:09,749 i.j.o.p.e.ReconciliationDispatcher 
[ERROR][flink/flink-job-1jm-checkpoint] Error during event processing 
ExecutionScope{ resource id: ResourceID{name='flink-job-1jm-checkpoint', 
namespace='flink'}, version: 120505701} failed.

org.apache.flink.kubernetes.operator.exception.ReconciliationException: 
org.apache.flink.util.FlinkRuntimeException: 
java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
error., <Exception on server side:

java.util.concurrent.CompletionException: 
org.apache.flink.runtime.rest.handler.RestHandlerException: Jar file 
/tmp/flink-web-69209c8b-6ed5-45f2-aa99-4bc41efb7983/flink-web-upload/d7df9d81-2cfb-4642-a450-e9080a30db12_StateMachineExample.jar
 does not exist

at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toPackagedProgram(JarHandlerUtils.java:172)

at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.applyToConfiguration(JarHandlerUtils.java:141)

at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:100)

at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:57)

at 
org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)

at 
org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195)

at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)

at java.base/java.util.Optional.ifPresent(Unknown Source)

Leader JobManager Pod中也看到类似日志。



【分析】

在HA模式中,JM的replica是否有必要设为过多(配置上replica可配)? 
有多个JM时,其中一个是leader,另一个JM怎么会接受到了上传的jar了?

第1种情况看上去像是 
operator提交jar到一个JM上,这个JM也把jar部署起来了,然后第二个JM成为了Leader,operator删除时又连接到了第2个JM上,仅导致删除jar失败。

第2种情况是operator提交jar到一个JM上,但是另一个JM才是leader,导致它部署时找不到jar.

如果一个JM是leader,那么通过WebUI访问时一定是访问到这个POD,还是也有可能访问到另一个JM的POD?

通过WebUI上传jar,webUI自动刷新时,一会儿看到这个jar,一会儿看不到。好像两个POD是随机访问的。





回复