Sorry I cant help you, but we're also experiencing slow checkpointing,
when having backpressure from sink.
I tried HDFS, S3, and RocksDB state backends, but to no avail -
checkpointing always times out with backpressure.
Can we somehow reduce Flink's internal buffer sizes, so checkpointing
with backpressure becomes faster?
- Rune
---
Our current setup - (improvement suggestions welome!):
Flink 1.2.0, yarn@AWS EMR, 1 master + 3 slaves, m4.xlarge
program_parallelism: 12
taskmanagers: 6
slotsPerTaskManager: 4
taskmanager_heap_mb: 4096
jobmanager_heap_mb: 1024
Basic program structure:
1) read batch from Kinesis
2) Split batch and shuffle using custom partitioner (consistent hashing).
3) enrich using external REST service
4) Write to database (This step is the bottleneck)
On 24-04-2017 09:32, Yassine MARZOUGUI wrote:
Im sorry guys if you received multiple instances of this mail, I kept
trying to send it yesterday, but looks like the mailing list was stuck
and didn't dispatch it until now. Sorry for the disturb.
On Apr 23, 2017 20:53, "Yassine MARZOUGUI" <y.marzou...@mindlytix.com
<mailto:y.marzou...@mindlytix.com>> wrote:
Hi all,
I have a Streaming pipeline as follows:
1 - read a folder continuousely from HDFS
2 - filter duplicates (using keyby(x->x) and keeping a state per
key indicating whether its is seen)
3 - schedule some future actions on the stream using
ProcessFunction and processing time timers (elements are kept in a
MapState)
4- write results back to HDFS using a BucketingSink.
I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit:
9fb074c).
Currenlty the source contain just one a file of 1GB, so that's the
maximum state that the job might hold. I noticed that the
backpressure on the operators #1 and #2 is High, and the split
reader has only read 60 Mb out of 1Gb source source file. I
suspect this is because the ProcessFunction is slow (on purpose).
However looks like this affected the checkpoints which are failing
after the timeout (which is set to 2 hours), see attached screenshot.
In the job manager logs I keep getting warnings :
2017-04-23 19:32:38,827 WARN
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late
message for now expired checkpoint attempt 8 from
210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86.
Is the high backpressure the cause for the checkpoints being too
slow? If yes Is there a way to disbale the backpressure mechanism
since the records will be buffered in the rocksdb state after all
which is backed by the disk?
Thank you.
Best,
Yassine
--
Venlig hilsen/Best regards *Rune Skou Larsen*
goto Trifork Public A/S Dyssen 1 · DK-8200 Aarhus N · Denmark Phone +45
3160 2497 Skype: rsltrifork Twitter: RuneSkouLarsen