If you are running a session cluster, then Flink will create a config map for every submitted job. These config maps will unfortunately only be cleaned up when you shut down the cluster. This is a known limitation which we want to fix soon [1, 2].
If you can help us with updating the documentation properly (e.g. which role binding to use for the service account with minimal permissions), then we would highly appreciate your help. [1] https://issues.apache.org/jira/browse/FLINK-20695 [2] https://issues.apache.org/jira/browse/FLINK-21008 Cheers, Till On Tue, Feb 16, 2021 at 3:45 PM Omer Ozery <omeroz...@gmail.com> wrote: > Hey guys, > You are right, the documentation lacks this part, and the flink needs it > to start. > I'm not sure if it's 100% solved our problem because it creates endless > copies of the configmaps with random ids and also our jobs can't schedule > for some reason. > I will investigate this further with Daniel and let you know. > Also the access control given using this document is vast, imprecise and > clusterwide (it uses a default edit-all clusterRole), so when you create a > PR, make sure that whoever is in charge of the flink-k8s integration, > document the accurate permissions to create and attach to the flink's > components. > > Thanks very much for your help! > we will keep you updated. > Omer > > On Tue, Feb 16, 2021 at 3:26 PM Till Rohrmann <trohrm...@apache.org> > wrote: > >> Hi Omar, >> >> I think Matthias is right. The K8s HA services create and edit config >> maps. Hence they need the rights to do this. In the native K8s >> documentation there is a section about how to create a service account with >> the right permissions [1]. >> >> I think that our K8s HA documentation currently lacks this part. I will >> create a PR to update the documentation. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#rbac >> >> Cheers, >> Till >> >> On Mon, Feb 15, 2021 at 9:32 AM Matthias Pohl <matth...@ververica.com> >> wrote: >> >>> I'm adding the Flink user ML to the conversation again. >>> >>> On Mon, Feb 15, 2021 at 8:18 AM Matthias Pohl <matth...@ververica.com> >>> wrote: >>> >>>> Hi Omer, >>>> thanks for sharing the configuration. You're right: Using NFS for HA's >>>> storageDir is fine. >>>> >>>> About the error message you're referring to: I haven't worked with the >>>> HA k8s service, yet. But the RBAC is a good hint. Flink's native Kubernetes >>>> documentation [1] points out that you can use a custom service account. >>>> This one needs special permissions to start/stop pods automatically (which >>>> does not apply in your case) but also to access ConfigMaps. You might want >>>> to try setting the permission as described in [1]. >>>> >>>> Best, >>>> Matthias >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#rbac >>>> >>>> On Sun, Feb 14, 2021 at 7:16 PM Omer Ozery <omeroz...@gmail.com> wrote: >>>> >>>>> Hey Matthias. >>>>> My name is Omer, i am Daniel's devops, i will elaborate about our >>>>> flink situation. >>>>> these our flink resource definitions, as they are generated using the >>>>> helm template command (minus log4j,metrics configuration and some >>>>> sensitive >>>>> data) >>>>> --- >>>>> # Source: flink/templates/flink-configmap.yaml >>>>> apiVersion: v1 >>>>> kind: ConfigMap >>>>> metadata: >>>>> name: flink-config >>>>> labels: >>>>> app: flink >>>>> data: >>>>> flink-conf.yaml: | >>>>> jobmanager.rpc.address: flink-jobmanager >>>>> jobmanager.rpc.port: 6123 >>>>> jobmanager.execution.failover-strategy: region >>>>> jobmanager.memory.process.size: 8g >>>>> taskmanager.memory.process.size: 24g >>>>> taskmanager.memory.task.off-heap.size: 1g >>>>> taskmanager.numberOfTaskSlots: 4 >>>>> queryable-state.proxy.ports: 6125 >>>>> queryable-state.enable: true >>>>> blob.server.port: 6124 >>>>> parallelism.default: 1 >>>>> state.backend.incremental: true >>>>> state.backend: rocksdb >>>>> state.backend.rocksdb.localdir: /opt/flink/rocksdb >>>>> state.checkpoints.dir: file:///opt/flink/checkpoints >>>>> classloader.resolve-order: child-first >>>>> kubernetes.cluster-id: flink-cluster >>>>> kubernetes.namespace: intel360-beta >>>>> high-availability: >>>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory >>>>> high-availability.storageDir: file:///opt/flink/recovery >>>>> >>>>> --- >>>>> # Source: flink/templates/flink-service.yaml >>>>> apiVersion: v1 >>>>> kind: Service >>>>> metadata: >>>>> name: flink-jobmanager >>>>> labels: >>>>> {} >>>>> spec: >>>>> ports: >>>>> - name: http-ui >>>>> port: 8081 >>>>> targetPort: http-ui >>>>> - name: tcp-rpc >>>>> port: 6123 >>>>> targetPort: tcp-rpc >>>>> - name: tcp-blob >>>>> port: 6124 >>>>> targetPort: tcp-blob >>>>> selector: >>>>> app: flink >>>>> component: jobmanager >>>>> --- >>>>> # Source: flink/templates/flink-deployment.yaml >>>>> apiVersion: apps/v1 >>>>> kind: Deployment >>>>> metadata: >>>>> name: flink-jobmanager >>>>> spec: >>>>> replicas: 1 >>>>> selector: >>>>> matchLabels: >>>>> app: flink >>>>> component: jobmanager >>>>> template: >>>>> metadata: >>>>> labels: >>>>> app: flink >>>>> component: jobmanager >>>>> annotations: >>>>> checksum/config: >>>>> f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941 >>>>> spec: >>>>> containers: >>>>> - name: jobmanager >>>>> image: flink:1.12.1-scala_2.11-java11 >>>>> args: [ "jobmanager" ] >>>>> ports: >>>>> - name: http-ui >>>>> containerPort: 8081 >>>>> - name: tcp-rpc >>>>> containerPort: 6123 >>>>> - name: tcp-blob >>>>> containerPort: 6124 >>>>> resources: >>>>> {} >>>>> # Environment Variables >>>>> env: >>>>> - name: ENABLE_CHECKPOINTING >>>>> value: "true" >>>>> - name: JOB_MANAGER_RPC_ADDRESS >>>>> value: "flink-jobmanager" >>>>> volumeMounts: >>>>> - name: flink-config >>>>> mountPath: /opt/flink/conf/flink-conf.yaml >>>>> subPath: flink-conf.yaml >>>>> # NFS mounts >>>>> - name: flink-checkpoints >>>>> mountPath: "/opt/flink/checkpoints" >>>>> - name: flink-recovery >>>>> mountPath: "/opt/flink/recovery" >>>>> volumes: >>>>> - name: flink-config >>>>> configMap: >>>>> name: flink-config >>>>> # NFS volumes >>>>> - name: flink-checkpoints >>>>> nfs: >>>>> server: "my-nfs-server.my-org" >>>>> path: "/my-shared-nfs-dir/flink/checkpoints" >>>>> - name: flink-recovery >>>>> nfs: >>>>> server: "my-nfs-server.my-org" >>>>> path: "/my-shared-nfs-dir/flink/recovery" >>>>> --- >>>>> # Source: flink/templates/flink-deployment.yaml >>>>> apiVersion: apps/v1 >>>>> kind: Deployment >>>>> metadata: >>>>> name: flink-taskmanager >>>>> spec: >>>>> replicas: 7 >>>>> selector: >>>>> matchLabels: >>>>> app: flink >>>>> component: taskmanager >>>>> template: >>>>> metadata: >>>>> labels: >>>>> app: flink >>>>> component: taskmanager >>>>> annotations: >>>>> checksum/config: >>>>> f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941 >>>>> spec: >>>>> containers: >>>>> - name: taskmanager >>>>> image: flink:1.12.1-scala_2.11-java11 >>>>> args: [ "taskmanager" ] >>>>> resources: >>>>> limits: >>>>> cpu: 6000m >>>>> memory: 24Gi >>>>> requests: >>>>> cpu: 6000m >>>>> memory: 24Gi >>>>> # Environment Variables >>>>> env: >>>>> - name: ENABLE_CHECKPOINTING >>>>> value: "true" >>>>> - name: JOB_MANAGER_RPC_ADDRESS >>>>> value: "flink-jobmanager" >>>>> volumeMounts: >>>>> - name: flink-config >>>>> mountPath: /opt/flink/conf/flink-conf.yaml >>>>> subPath: flink-conf.yaml >>>>> # NFS mounts >>>>> - name: flink-checkpoints >>>>> mountPath: "/opt/flink/checkpoints" >>>>> - name: flink-recovery >>>>> mountPath: "/opt/flink/recovery" >>>>> volumes: >>>>> - name: flink-config >>>>> configMap: >>>>> name: flink-config >>>>> # NFS volumes >>>>> - name: flink-checkpoints >>>>> nfs: >>>>> server: "my-nfs-server.my-org" >>>>> path: "/my-shared-nfs-dir/flink/checkpoints" >>>>> - name: flink-recovery >>>>> nfs: >>>>> server: "my-nfs-server.my-org" >>>>> path: "/my-shared-nfs-dir/flink/recovery" >>>>> --- >>>>> # Source: flink/templates/flink-ingress.yaml >>>>> apiVersion: extensions/v1beta1 >>>>> kind: Ingress >>>>> metadata: >>>>> name: jobmanager >>>>> spec: >>>>> rules: >>>>> - host: my.flink.job.manager.url >>>>> http: >>>>> paths: >>>>> - path: / >>>>> backend: >>>>> serviceName: flink-jobmanager >>>>> servicePort: 8081 >>>>> --- >>>>> >>>>> as you can see we are using the skeleton of the standalone >>>>> configuration as it documented here: >>>>> >>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html >>>>> with some per-company configuration obviously, but still under the >>>>> scope of this document.. >>>>> >>>>> on a normal beautiful day and without the HA configuration, everything >>>>> works fine. >>>>> when trying to configure kubernetes HA using this document: >>>>> >>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html >>>>> with the following parameters: >>>>> kubernetes.cluster-id: flink-cluster >>>>> high-availability: >>>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory >>>>> high-availability.storageDir: file:///opt/flink/recovery >>>>> >>>>> the jobmanager fails with the following error: >>>>> 2021-02-14 16:57:19,103 ERROR >>>>> io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] - >>>>> Exception occurred while acquiring lock 'ConfigMapLock: default - >>>>> flink-cluster-restserver-leader (54211907-eba9-47b1-813e-11f12ba89ccb)' >>>>> io.fabric8.kubernetes.client.KubernetesClientException: Failure >>>>> executing: GET at: >>>>> https://10.96.0.1/api/v1/namespaces/default/configmaps/flink-cluster-restserver-leader. >>>>> Message: Forbidden!Configured service account doesn't have access. Service >>>>> account may have been revoked. configmaps >>>>> "flink-cluster-restserver-leader" >>>>> is forbidden: User "system:serviceaccount:intel360-beta:default" cannot >>>>> get >>>>> resource "configmaps" in API group "" in the namespace "default". >>>>> >>>>> so we added this line as well (as you can see in the flink-config >>>>> configmap above) >>>>> kubernetes.namespace: intel360-beta >>>>> although it is not part of the document and i don't think flink should >>>>> be aware of the namespace it resides in, it damages the modularity of >>>>> upper >>>>> layers of configurations, regardless we added it and then got the the >>>>> following error: >>>>> >>>>> 2021-02-14 17:00:57,086 ERROR >>>>> io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] - >>>>> Exception occurred while acquiring lock 'ConfigMapLock: intel360-beta - >>>>> flink-cluster-restserver-leader (66180ce6-c62e-4ea4-9420-0a3134bef3d6)' >>>>> io.fabric8.kubernetes.client.KubernetesClientException: Failure >>>>> executing: GET at: >>>>> https://10.96.0.1/api/v1/namespaces/intel360-beta/configmaps/flink-cluster-restserver-leader. >>>>> Message: Forbidden!Configured service account doesn't have access. Service >>>>> account may have been revoked. configmaps >>>>> "flink-cluster-restserver-leader" >>>>> is forbidden: User "system:serviceaccount:intel360-beta:default" cannot >>>>> get >>>>> resource "configmaps" in API group "" in the namespace "intel360-beta". >>>>> >>>>> which is bassically the same error message just directed to the >>>>> flink's namespace. >>>>> my question is, do i need to add RBAC to the flink's service account, >>>>> because i got the impression from the flink official documents and some >>>>> blogs responses that it designed to function without any special >>>>> permissions. >>>>> if we do need RBAC can you give an official documentations reference >>>>> of the exact permissions. >>>>> >>>>> NOTE: as you can see our flink-checkpoints and recovery locations are >>>>> directed to a local directory mounted to a shared NFS between all tasks >>>>> and >>>>> job manager, since our infrastructure is bare-metal by design. (although >>>>> this one is hosted in AWS) >>>>> >>>>> thanks in advance >>>>> Omer >>>>> >>>>> >>>>> ---------- Forwarded message --------- >>>>> From: Daniel Peled <daniel.peled.w...@gmail.com> >>>>> Date: Sun, Feb 14, 2021 at 6:18 PM >>>>> Subject: Fwd: Flink’s Kubernetes HA services - NOT working >>>>> To: <omeroz...@gmail.com> >>>>> >>>>> >>>>> >>>>> >>>>> ---------- Forwarded message --------- >>>>> מאת: Matthias Pohl <matth...@ververica.com> >>>>> Date: יום ה׳, 11 בפבר׳ 2021 ב-17:37 >>>>> Subject: Re: Flink’s Kubernetes HA services - NOT working >>>>> To: Matthias Pohl <matth...@ververica.com> >>>>> Cc: Daniel Peled <daniel.peled.w...@gmail.com>, user < >>>>> user@flink.apache.org> >>>>> >>>>> >>>>> One other thing: It looks like you've set high-availability.storageDir >>>>> to a local path file:///opt/flink/recovery. You should use a storage path >>>>> that is accessible from all Flink cluster components (e.g. using S3). Only >>>>> references are stored in Kubernetes ConfigMaps [1]. >>>>> >>>>> Best, >>>>> Matthias >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#configuration >>>>> >>>>> On Wed, Feb 10, 2021 at 6:08 PM Matthias Pohl <matth...@ververica.com> >>>>> wrote: >>>>> >>>>>> Hi Daniel, >>>>>> what's the exact configuration you used? Did you use the resource >>>>>> definitions provided in the Standalone Flink on Kubernetes docs [1]? Did >>>>>> you do certain things differently in comparison to the documentation? >>>>>> >>>>>> Best, >>>>>> Matthias >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#appendix >>>>>> >>>>>> On Wed, Feb 10, 2021 at 1:31 PM Daniel Peled < >>>>>> daniel.peled.w...@gmail.com> wrote: >>>>>> >>>>>>> >>>>>>> ,Hey >>>>>>> >>>>>>> We are using standalone flink on kubernetes >>>>>>> :"And we have followed the instructions in the following link >>>>>>> "Kubernetes HA Services >>>>>>> >>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html >>>>>>> .We were unable to make it work >>>>>>> .We are facing a lot of problems >>>>>>> For example some of the jobs don't start complaining that there are >>>>>>> not enough slots available - although there are enough slots and it >>>>>>> seems >>>>>>> as the job manager is NOT aware of all the task managers >>>>>>> .In other scenario we were unable to run any job at all >>>>>>> The flink dashboard is unresponsive and we get the error >>>>>>> "flink service temporarily unavailable due to an ongoing leader >>>>>>> election. please refresh" >>>>>>> .We believe we are missing some configurations >>>>>>> ?Are there any more detailed instructions >>>>>>> ?And suggestions/tips >>>>>>> .Attached is the log of the job manager in one of the attempts >>>>>>> >>>>>>> Please give me some advice. >>>>>>> BR, >>>>>>> Danny >>>>>>> >>>>>> >>>>>