If you are running a session cluster, then Flink will create a config map
for every submitted job. These config maps will unfortunately only be
cleaned up when you shut down the cluster. This is a known limitation which
we want to fix soon [1, 2].

If you can help us with updating the documentation properly (e.g. which
role binding to use for the service account with minimal permissions), then
we would highly appreciate your help.

[1] https://issues.apache.org/jira/browse/FLINK-20695
[2] https://issues.apache.org/jira/browse/FLINK-21008

Cheers,
Till

On Tue, Feb 16, 2021 at 3:45 PM Omer Ozery <omeroz...@gmail.com> wrote:

> Hey guys,
> You are right, the documentation lacks this part, and the flink needs it
> to start.
> I'm not sure if it's 100% solved our problem because it creates endless
> copies of the configmaps with random ids and also our jobs can't schedule
> for some reason.
> I will investigate this further with Daniel and let you know.
> Also the access control given using this document is vast, imprecise and
> clusterwide (it uses a default edit-all clusterRole), so when you create a
> PR, make sure that whoever is in charge of  the flink-k8s integration,
> document the accurate permissions to create and attach to the flink's
> components.
>
> Thanks very much for your help!
> we will keep you updated.
> Omer
>
> On Tue, Feb 16, 2021 at 3:26 PM Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> Hi Omar,
>>
>> I think Matthias is right. The K8s HA services create and edit config
>> maps. Hence they need the rights to do this. In the native K8s
>> documentation there is a section about how to create a service account with
>> the right permissions [1].
>>
>> I think that our K8s HA documentation currently lacks this part. I will
>> create a PR to update the documentation.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#rbac
>>
>> Cheers,
>> Till
>>
>> On Mon, Feb 15, 2021 at 9:32 AM Matthias Pohl <matth...@ververica.com>
>> wrote:
>>
>>> I'm adding the Flink user ML to the conversation again.
>>>
>>> On Mon, Feb 15, 2021 at 8:18 AM Matthias Pohl <matth...@ververica.com>
>>> wrote:
>>>
>>>> Hi Omer,
>>>> thanks for sharing the configuration. You're right: Using NFS for HA's
>>>> storageDir is fine.
>>>>
>>>> About the error message you're referring to: I haven't worked with the
>>>> HA k8s service, yet. But the RBAC is a good hint. Flink's native Kubernetes
>>>> documentation [1] points out that you can use a custom service account.
>>>> This one needs special permissions to start/stop pods automatically (which
>>>> does not apply in your case) but also to access ConfigMaps. You might want
>>>> to try setting the permission as described in [1].
>>>>
>>>> Best,
>>>> Matthias
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#rbac
>>>>
>>>> On Sun, Feb 14, 2021 at 7:16 PM Omer Ozery <omeroz...@gmail.com> wrote:
>>>>
>>>>> Hey Matthias.
>>>>> My name is Omer, i am Daniel's devops, i will elaborate about our
>>>>> flink situation.
>>>>> these our flink resource definitions, as they are generated using the
>>>>> helm template command (minus log4j,metrics configuration and some 
>>>>> sensitive
>>>>> data)
>>>>> ---
>>>>> # Source: flink/templates/flink-configmap.yaml
>>>>> apiVersion: v1
>>>>> kind: ConfigMap
>>>>> metadata:
>>>>>   name: flink-config
>>>>>   labels:
>>>>>     app: flink
>>>>> data:
>>>>>   flink-conf.yaml: |
>>>>>     jobmanager.rpc.address: flink-jobmanager
>>>>>     jobmanager.rpc.port: 6123
>>>>>     jobmanager.execution.failover-strategy: region
>>>>>     jobmanager.memory.process.size: 8g
>>>>>     taskmanager.memory.process.size: 24g
>>>>>     taskmanager.memory.task.off-heap.size: 1g
>>>>>     taskmanager.numberOfTaskSlots: 4
>>>>>     queryable-state.proxy.ports: 6125
>>>>>     queryable-state.enable: true
>>>>>     blob.server.port: 6124
>>>>>     parallelism.default: 1
>>>>>     state.backend.incremental: true
>>>>>     state.backend: rocksdb
>>>>>     state.backend.rocksdb.localdir: /opt/flink/rocksdb
>>>>>     state.checkpoints.dir: file:///opt/flink/checkpoints
>>>>>     classloader.resolve-order: child-first
>>>>>     kubernetes.cluster-id: flink-cluster
>>>>>     kubernetes.namespace: intel360-beta
>>>>>     high-availability:
>>>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>>>>     high-availability.storageDir: file:///opt/flink/recovery
>>>>>
>>>>> ---
>>>>> # Source: flink/templates/flink-service.yaml
>>>>> apiVersion: v1
>>>>> kind: Service
>>>>> metadata:
>>>>>   name: flink-jobmanager
>>>>>   labels:
>>>>>     {}
>>>>> spec:
>>>>>   ports:
>>>>>   - name: http-ui
>>>>>     port: 8081
>>>>>     targetPort: http-ui
>>>>>   - name: tcp-rpc
>>>>>     port: 6123
>>>>>     targetPort: tcp-rpc
>>>>>   - name: tcp-blob
>>>>>     port: 6124
>>>>>     targetPort: tcp-blob
>>>>>   selector:
>>>>>     app: flink
>>>>>     component: jobmanager
>>>>> ---
>>>>> # Source: flink/templates/flink-deployment.yaml
>>>>> apiVersion: apps/v1
>>>>> kind: Deployment
>>>>> metadata:
>>>>>   name: flink-jobmanager
>>>>> spec:
>>>>>   replicas: 1
>>>>>   selector:
>>>>>     matchLabels:
>>>>>       app: flink
>>>>>       component: jobmanager
>>>>>   template:
>>>>>     metadata:
>>>>>       labels:
>>>>>         app: flink
>>>>>         component: jobmanager
>>>>>       annotations:
>>>>>         checksum/config:
>>>>> f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
>>>>>     spec:
>>>>>       containers:
>>>>>       - name: jobmanager
>>>>>         image: flink:1.12.1-scala_2.11-java11
>>>>>         args: [ "jobmanager" ]
>>>>>         ports:
>>>>>         - name: http-ui
>>>>>           containerPort: 8081
>>>>>         - name: tcp-rpc
>>>>>           containerPort: 6123
>>>>>         - name: tcp-blob
>>>>>           containerPort: 6124
>>>>>         resources:
>>>>>           {}
>>>>>         # Environment Variables
>>>>>         env:
>>>>>         - name: ENABLE_CHECKPOINTING
>>>>>           value: "true"
>>>>>         - name: JOB_MANAGER_RPC_ADDRESS
>>>>>           value: "flink-jobmanager"
>>>>>         volumeMounts:
>>>>>         - name: flink-config
>>>>>           mountPath: /opt/flink/conf/flink-conf.yaml
>>>>>           subPath: flink-conf.yaml
>>>>>         # NFS mounts
>>>>>         - name: flink-checkpoints
>>>>>           mountPath: "/opt/flink/checkpoints"
>>>>>         - name: flink-recovery
>>>>>           mountPath: "/opt/flink/recovery"
>>>>>       volumes:
>>>>>       - name: flink-config
>>>>>         configMap:
>>>>>           name: flink-config
>>>>>       # NFS volumes
>>>>>       - name: flink-checkpoints
>>>>>         nfs:
>>>>>           server: "my-nfs-server.my-org"
>>>>>           path: "/my-shared-nfs-dir/flink/checkpoints"
>>>>>       - name: flink-recovery
>>>>>         nfs:
>>>>>           server: "my-nfs-server.my-org"
>>>>>           path: "/my-shared-nfs-dir/flink/recovery"
>>>>> ---
>>>>> # Source: flink/templates/flink-deployment.yaml
>>>>> apiVersion: apps/v1
>>>>> kind: Deployment
>>>>> metadata:
>>>>>   name: flink-taskmanager
>>>>> spec:
>>>>>   replicas: 7
>>>>>   selector:
>>>>>     matchLabels:
>>>>>       app: flink
>>>>>       component: taskmanager
>>>>>   template:
>>>>>     metadata:
>>>>>       labels:
>>>>>         app: flink
>>>>>         component: taskmanager
>>>>>       annotations:
>>>>>         checksum/config:
>>>>> f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
>>>>>     spec:
>>>>>       containers:
>>>>>       - name: taskmanager
>>>>>         image: flink:1.12.1-scala_2.11-java11
>>>>>         args: [ "taskmanager" ]
>>>>>         resources:
>>>>>           limits:
>>>>>             cpu: 6000m
>>>>>             memory: 24Gi
>>>>>           requests:
>>>>>             cpu: 6000m
>>>>>             memory: 24Gi
>>>>>         # Environment Variables
>>>>>         env:
>>>>>         - name: ENABLE_CHECKPOINTING
>>>>>           value: "true"
>>>>>         - name: JOB_MANAGER_RPC_ADDRESS
>>>>>           value: "flink-jobmanager"
>>>>>         volumeMounts:
>>>>>         - name: flink-config
>>>>>           mountPath: /opt/flink/conf/flink-conf.yaml
>>>>>           subPath: flink-conf.yaml
>>>>>         # NFS mounts
>>>>>         - name: flink-checkpoints
>>>>>           mountPath: "/opt/flink/checkpoints"
>>>>>         - name: flink-recovery
>>>>>           mountPath: "/opt/flink/recovery"
>>>>>       volumes:
>>>>>       - name: flink-config
>>>>>         configMap:
>>>>>           name: flink-config
>>>>>       # NFS volumes
>>>>>       - name: flink-checkpoints
>>>>>         nfs:
>>>>>           server: "my-nfs-server.my-org"
>>>>>           path: "/my-shared-nfs-dir/flink/checkpoints"
>>>>>       - name: flink-recovery
>>>>>         nfs:
>>>>>           server: "my-nfs-server.my-org"
>>>>>           path: "/my-shared-nfs-dir/flink/recovery"
>>>>> ---
>>>>> # Source: flink/templates/flink-ingress.yaml
>>>>> apiVersion: extensions/v1beta1
>>>>> kind: Ingress
>>>>> metadata:
>>>>>   name: jobmanager
>>>>> spec:
>>>>>   rules:
>>>>>     - host: my.flink.job.manager.url
>>>>>       http:
>>>>>         paths:
>>>>>           - path: /
>>>>>             backend:
>>>>>               serviceName: flink-jobmanager
>>>>>               servicePort: 8081
>>>>> ---
>>>>>
>>>>> as you can see we are using the skeleton of the standalone
>>>>> configuration as it documented here:
>>>>>
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html
>>>>> with some per-company configuration obviously, but still under the
>>>>> scope of this document..
>>>>>
>>>>> on a normal beautiful day and without the HA configuration, everything
>>>>> works fine.
>>>>> when trying to configure kubernetes HA using this document:
>>>>>
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
>>>>> with the following parameters:
>>>>>     kubernetes.cluster-id: flink-cluster
>>>>>     high-availability:
>>>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>>>>     high-availability.storageDir: file:///opt/flink/recovery
>>>>>
>>>>> the jobmanager fails with the following error:
>>>>> 2021-02-14 16:57:19,103 ERROR
>>>>> io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] -
>>>>> Exception occurred while acquiring lock 'ConfigMapLock: default -
>>>>> flink-cluster-restserver-leader (54211907-eba9-47b1-813e-11f12ba89ccb)'
>>>>> io.fabric8.kubernetes.client.KubernetesClientException: Failure
>>>>> executing: GET at:
>>>>> https://10.96.0.1/api/v1/namespaces/default/configmaps/flink-cluster-restserver-leader.
>>>>> Message: Forbidden!Configured service account doesn't have access. Service
>>>>> account may have been revoked. configmaps 
>>>>> "flink-cluster-restserver-leader"
>>>>> is forbidden: User "system:serviceaccount:intel360-beta:default" cannot 
>>>>> get
>>>>> resource "configmaps" in API group "" in the namespace "default".
>>>>>
>>>>> so we added this line as well (as you can see in the flink-config
>>>>> configmap above)
>>>>> kubernetes.namespace: intel360-beta
>>>>> although it is not part of the document and i don't think flink should
>>>>> be aware of the namespace it resides in, it damages the modularity of 
>>>>> upper
>>>>> layers of configurations, regardless we added it and then got the the
>>>>> following error:
>>>>>
>>>>> 2021-02-14 17:00:57,086 ERROR
>>>>> io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] -
>>>>> Exception occurred while acquiring lock 'ConfigMapLock: intel360-beta -
>>>>> flink-cluster-restserver-leader (66180ce6-c62e-4ea4-9420-0a3134bef3d6)'
>>>>> io.fabric8.kubernetes.client.KubernetesClientException: Failure
>>>>> executing: GET at:
>>>>> https://10.96.0.1/api/v1/namespaces/intel360-beta/configmaps/flink-cluster-restserver-leader.
>>>>> Message: Forbidden!Configured service account doesn't have access. Service
>>>>> account may have been revoked. configmaps 
>>>>> "flink-cluster-restserver-leader"
>>>>> is forbidden: User "system:serviceaccount:intel360-beta:default" cannot 
>>>>> get
>>>>> resource "configmaps" in API group "" in the namespace "intel360-beta".
>>>>>
>>>>> which is bassically the same error message just directed to the
>>>>> flink's namespace.
>>>>> my question is, do i need to add RBAC to the flink's service account,
>>>>> because i got the impression from the flink official documents and some
>>>>> blogs responses that it designed to function without any special
>>>>> permissions.
>>>>> if we do need RBAC can you give an official documentations reference
>>>>> of the exact permissions.
>>>>>
>>>>> NOTE: as you can see our flink-checkpoints and recovery locations are
>>>>> directed to a local directory mounted to a shared NFS between all tasks 
>>>>> and
>>>>> job manager, since our infrastructure is bare-metal by design. (although
>>>>> this one is hosted in AWS)
>>>>>
>>>>> thanks in advance
>>>>> Omer
>>>>>
>>>>>
>>>>> ---------- Forwarded message ---------
>>>>> From: Daniel Peled <daniel.peled.w...@gmail.com>
>>>>> Date: Sun, Feb 14, 2021 at 6:18 PM
>>>>> Subject: Fwd: Flink’s Kubernetes HA services - NOT working
>>>>> To: <omeroz...@gmail.com>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ---------- Forwarded message ---------
>>>>> מאת: Matthias Pohl <matth...@ververica.com>
>>>>> ‪Date: יום ה׳, 11 בפבר׳ 2021 ב-17:37‬
>>>>> Subject: Re: Flink’s Kubernetes HA services - NOT working
>>>>> To: Matthias Pohl <matth...@ververica.com>
>>>>> Cc: Daniel Peled <daniel.peled.w...@gmail.com>, user <
>>>>> user@flink.apache.org>
>>>>>
>>>>>
>>>>> One other thing: It looks like you've set high-availability.storageDir
>>>>> to a local path file:///opt/flink/recovery. You should use a storage path
>>>>> that is accessible from all Flink cluster components (e.g. using S3). Only
>>>>> references are stored in Kubernetes ConfigMaps [1].
>>>>>
>>>>> Best,
>>>>> Matthias
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#configuration
>>>>>
>>>>> On Wed, Feb 10, 2021 at 6:08 PM Matthias Pohl <matth...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Daniel,
>>>>>> what's the exact configuration you used? Did you use the resource
>>>>>> definitions provided in the Standalone Flink on Kubernetes docs [1]? Did
>>>>>> you do certain things differently in comparison to the documentation?
>>>>>>
>>>>>> Best,
>>>>>> Matthias
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#appendix
>>>>>>
>>>>>> On Wed, Feb 10, 2021 at 1:31 PM Daniel Peled <
>>>>>> daniel.peled.w...@gmail.com> wrote:
>>>>>>
>>>>>>>
>>>>>>> ,Hey
>>>>>>>
>>>>>>> We are using standalone flink on kubernetes
>>>>>>> :"And we have followed the instructions in the following link
>>>>>>> "Kubernetes HA Services
>>>>>>>
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
>>>>>>> .We were unable to make it work
>>>>>>> .We are facing a lot of problems
>>>>>>> For example some of the jobs don't start complaining that there are
>>>>>>> not enough slots available - although there are enough slots  and it 
>>>>>>> seems
>>>>>>> as the job manager is NOT aware of all the task managers
>>>>>>> .In other scenario we were unable to run any job at all
>>>>>>>  The flink dashboard is unresponsive and we get the error
>>>>>>> "flink service temporarily unavailable due to an ongoing leader
>>>>>>> election. please refresh"
>>>>>>> .We believe we are missing some configurations
>>>>>>>  ?Are there any more detailed instructions
>>>>>>> ?And suggestions/tips
>>>>>>>  .Attached is the log of the job manager in one of the attempts
>>>>>>>
>>>>>>> Please give me some advice.
>>>>>>> BR,
>>>>>>> Danny
>>>>>>>
>>>>>>
>>>>>

Reply via email to