Hi Monika, I would start with identifying the operator that causes backpressure. More information how to monitor backpressure you can find here in the docs[1]. You might also be interested in Seth's (cc'ed) webinar[2], where he also talks how to debug backpressure.
Best, Dawid [1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/back_pressure.html#monitoring-back-pressure [2] https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial On 22/04/2019 17:44, Monika Hristova wrote: > Hello, > > We are experiencing regular backpressure (at least once a week). I > would like to ask how we can fix it. > > Currently our configurations are: > *vi /usr/lib/flink/conf/flink-conf.yaml* > # Settings applied by Cloud Dataproc initialization action > jobmanager.rpc.address: bonusengine-prod-m-0 > jobmanager.heap.mb: 4096 > jobmanager.rpc.port: 6123 > taskmanager.heap.mb: 4096 > taskmanager.memory.preallocate: false > taskmanager.numberOfTaskSlots: 8 > #taskmanager.network.numberOfBuffers: 21952 # legacy deprecated > taskmanager.network.memory.fraction: 0.9 > taskmanager.network.memory.min: 67108864 > taskmanager.network.memory.max: 1073741824 > taskmanager.memory.segment-size: 65536 > parallelism.default: 52 > web.port: 8081 > web.timeout: 120000 > heartbeat.interval: 10000 > heartbeat.timeout: 100000 > yarn.application-attempts: 10 > high-availability: zookeeper > high-availability.zookeeper.quorum: > bonusengine-prod-m-0:2181,bonusengine-prod-m-1:2181,bonusengine-prod-m-2:2181 > high-availability.zookeeper.path.root: /flink > #high-availability.zookeeper.storageDir: hdfs:///flink/recovery # > legacy deprecated > high-availability.storageDir: hdfs:///flink/recovery > flink.partition-discovery.interval-millis=60000 > fs.hdfs.hadoopconf: /etc/hadoop/conf > state.backend: rocksdb > state.checkpoints.dir: hdfs:///bonusengine/checkpoints/ > state.savepoints.dir: hdfs:///bonusengine/savepoints > metrics.reporters: stsd > metrics.reporter.stsd.class: > org.apache.flink.metrics.statsd.StatsDReporter > metrics.reporter.stsd.host: 127.0.0.1 > metrics.reporter.stsd.port: 8125 > zookeeper.sasl.disable: true > yarn.reallocate-failed: true > yarn.maximum-failed-containers: 32 > web.backpressure.refresh-interval: 60000 > web.backpressure.num-samples: 100 > web.backpressure.delay-between-samples: 50 > > with Hadoop HA cluster: masters -> 8 vCPUs, 7.2 GB and slaves -> 16 > vCPUs, 60 GB with yarn configuration(*see attached file*) > > We have one yarn session started where 8 jobs are run. Three of them > are consuming the same source (kafka) which is causing the > backpressure, but only one of them experiences backpressure. The state > of the job is 20 something MB and the checkpoint is configured as follows: > *checkpointing.interval**=*300000 # makes sure value in ms of > progress happens between checkpoints > *checkpointing.pause_between_checkpointing**=*240000 # checkpoints > have to complete within value in ms or are discarded > *checkpointing.timeout**=*60000 # allows given number of checkpoints > to be in progress at the same time > *checkpointing.max_concurrent_checkpoints**=*1 # enables/disables > externalized checkpoints which are retained after job cancellation > *checkpointing.externalized_checkpoints.enabled**=*true > > as checkpointing pause was increased and timeout was reduced on > multiple occasions as the job kept failing unable to recover from > backpressure. RocksDB is configured state backend. The problem keeps > reproducing even with one minute timeout. Also I would like to point > out that the perfect checkpointing for that job would be 2 min. > I would like to ask what might be the problem or at least how to trace > it ? > > Best Regards, > Monika Hristova > > Get Outlook for Android <https://aka.ms/ghei36> >
signature.asc
Description: OpenPGP digital signature