Hi Monika,

I would start with identifying the operator that causes backpressure.
More information how to monitor backpressure you can find here in the
docs[1]. You might also be interested in Seth's (cc'ed) webinar[2],
where he also talks how to debug backpressure.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/back_pressure.html#monitoring-back-pressure

[2]
https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial

On 22/04/2019 17:44, Monika Hristova wrote:
> Hello,
>  
> We are experiencing regular backpressure (at least once a week). I
> would like to ask how we can fix it.
>  
> Currently our configurations are:
> *vi /usr/lib/flink/conf/flink-conf.yaml*
> # Settings applied by Cloud Dataproc initialization action
> jobmanager.rpc.address: bonusengine-prod-m-0
> jobmanager.heap.mb: 4096
> jobmanager.rpc.port: 6123
> taskmanager.heap.mb: 4096
> taskmanager.memory.preallocate: false
> taskmanager.numberOfTaskSlots: 8
> #taskmanager.network.numberOfBuffers: 21952     # legacy deprecated
> taskmanager.network.memory.fraction: 0.9
> taskmanager.network.memory.min: 67108864
> taskmanager.network.memory.max: 1073741824
> taskmanager.memory.segment-size: 65536
> parallelism.default: 52
> web.port: 8081
> web.timeout: 120000
> heartbeat.interval: 10000
> heartbeat.timeout: 100000
> yarn.application-attempts: 10
> high-availability: zookeeper
> high-availability.zookeeper.quorum:
> bonusengine-prod-m-0:2181,bonusengine-prod-m-1:2181,bonusengine-prod-m-2:2181
> high-availability.zookeeper.path.root: /flink
> #high-availability.zookeeper.storageDir: hdfs:///flink/recovery     #
> legacy deprecated
> high-availability.storageDir: hdfs:///flink/recovery
> flink.partition-discovery.interval-millis=60000
> fs.hdfs.hadoopconf: /etc/hadoop/conf
> state.backend: rocksdb
> state.checkpoints.dir: hdfs:///bonusengine/checkpoints/
> state.savepoints.dir: hdfs:///bonusengine/savepoints
> metrics.reporters: stsd
> metrics.reporter.stsd.class:
> org.apache.flink.metrics.statsd.StatsDReporter
> metrics.reporter.stsd.host: 127.0.0.1
> metrics.reporter.stsd.port: 8125
> zookeeper.sasl.disable: true
> yarn.reallocate-failed: true
> yarn.maximum-failed-containers: 32
> web.backpressure.refresh-interval: 60000
> web.backpressure.num-samples: 100
> web.backpressure.delay-between-samples: 50
>  
> with Hadoop HA cluster: masters -> 8 vCPUs, 7.2 GB and slaves -> 16
> vCPUs, 60 GB with yarn configuration(*see attached file*)
>  
> We have one yarn session started where 8 jobs are run. Three of them
> are consuming the same source (kafka) which is causing the
> backpressure, but only one of them experiences backpressure. The state
> of the job is 20 something MB and the checkpoint is configured as follows:
> *checkpointing.interval**=*300000 # makes sure value in  ms of
> progress happens between checkpoints
> *checkpointing.pause_between_checkpointing**=*240000 # checkpoints
> have to complete within value in ms or are discarded
> *checkpointing.timeout**=*60000 # allows given number of checkpoints
> to be in progress at the same time
> *checkpointing.max_concurrent_checkpoints**=*1 # enables/disables
> externalized checkpoints which are retained after job cancellation
> *checkpointing.externalized_checkpoints.enabled**=*true
>  
> as checkpointing pause was increased and timeout was reduced on
> multiple occasions as the job kept failing unable to recover from
> backpressure. RocksDB is configured state backend. The problem keeps
> reproducing even with one minute timeout. Also I would like to point
> out that the perfect checkpointing for that job would be 2 min.
> I would like to ask what might be the problem or at least how to trace
> it ?
>  
> Best Regards,
> Monika Hristova
>
> Get Outlook for Android <https://aka.ms/ghei36>
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to