Hi, Radoslav, > 1. Is it a good idea to have regular savepoints (say on a daily basis)? > 2. Is it possible to have high availability with Per-Job mode? Or maybe I > should go with session mode and make sure that my flink cluster is running a > single job?
Yes, we can achieve HA with per-job mode with ZooKeeper[2]. Look at your configuration, you need to also enable the checkpoint[2], which is automatically triggered and helps you to resume the program when failure, by setting the execution.checkpointing.interval. > 3. Let's assume that savepoints should be triggered only before job > update/deployment. How can I trigger a savepoint if my job is already > consuming more than 80% of the allowed memory per pod in k8s? My observations > show that k8s kills task managers (which are running as pods) and I need to > retry it a couple of times. I think with the checkpoint, you no longer need to trigger the savepoint manually with a specific condition as the checkpoint will be periodically triggered. > 4. Should I consider upgrading to version 1.12.3? > 5. Should I consider switching off state.backend.rocksdb.memory.managed > property even in version 1.12.3? I'm not an expert on the state backend, but it seems the fix of that issue is only applied to the docker image. So I guess you can package a custom image yourselves if you do not want to upgrade. However, if you are using the Native K8S mode[3] and there is no compatibility issue, I think it might be good to upgrading because there are also lots of improvements[4] in 1.12. > 6. How do I decide when the job parallelism should be increased? Are there > some metrics which can lead me to a clue that the parallelism should be > increased? As there are 6 Kafka sources in your job, I think the parallelism should first be fixed with the topic partition number. For metrics, you could refer to the backpressure of tasks and numRecordsOutPerSecond[5]. [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/ha/zookeeper_ha/ [2] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html [3] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html [4] https://issues.apache.org/jira/browse/FLINK-17709 [5] https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/metrics.html#io Best, Yangze Guo On Mon, Apr 26, 2021 at 4:14 PM Radoslav Smilyanov <radoslav.smilya...@smule.com> wrote: > > Hi all, > > I am having multiple questions regarding Flink :) Let me give you some > background of what I have done so far. > > Description > I am using Flink 1.11.2. My job is doing data enrichment. Data is consumed > from 6 different kafka topics and it is joined via multiple > CoProcessFunctions. On a daily basis the job is handling ~20 millions events > from the source kafka topics. > > Configuration > These are the settings I am using: > > jobmanager.memory.process.size: 4096m > jobmanager.memory.off-heap.size: 512m > taskmanager.memory.process.size: 12000m > taskmanager.memory.task.off-heap.size: 512m > taskmanager.numberOfTaskSlots: 1 > parallelism.default: 5 > taskmanager.rpc.port: 6122 > jobmanager.execution.failover-strategy: region > state.backend: rocksdb > state.backend.incremental: true > state.backend.rocksdb.localdir: /opt/flink/rocksdb > state.backend.rocksdb.memory.managed: true > state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED > state.backend.rocksdb.block.cache-size: 64mb > state.checkpoints.dir: s3://bucket/checkpoints > state.savepoints.dir: s3://bucket/savepoints > s3.access-key: AWS_ACCESS_KEY_ID > s3.secret-key: AWS_SECRET_ACCESS_KEY > s3.endpoint: http://<internal_url> > s3.path.style.access: true > s3.entropy.key: _entropy_ > s3.entropy.length: 8 > presto.s3.socket-timeout: 10m > client.timeout: 60min > > Deployment setup > Flink is deployed in k8s with Per-Job mode having 1 job manager and 5 task > managers. I have a daily cron job which triggers savepoint in order to have a > fresh copy of the whole state. > > Problems with the existing setup > 1. I observe that savepoints are causing Flink to consume more than the > allowed memory. I observe the behavior described in this stackoverflow post > (which seems to be solved in 1.12.X if I am getting it right). > 2. I cannot achieve high availability with Per-Job mode and thus I ended up > having a regular savepoint on a daily basis. > > Questions > 1. Is it a good idea to have regular savepoints (say on a daily basis)? > 2. Is it possible to have high availability with Per-Job mode? Or maybe I > should go with session mode and make sure that my flink cluster is running a > single job? > 3. Let's assume that savepoints should be triggered only before job > update/deployment. How can I trigger a savepoint if my job is already > consuming more than 80% of the allowed memory per pod in k8s? My observations > show that k8s kills task managers (which are running as pods) and I need to > retry it a couple of times. > 4. Should I consider upgrading to version 1.12.3? > 5. Should I consider switching off state.backend.rocksdb.memory.managed > property even in version 1.12.3? > 6. How do I decide when the job parallelism should be increased? Are there > some metrics which can lead me to a clue that the parallelism should be > increased? > > Best Regards, > Rado > > >