Sorry I cant help you, but we're also experiencing slow checkpointing, when having backpressure from sink.

I tried HDFS, S3, and RocksDB state backends, but to no avail - checkpointing always times out with backpressure.

Can we somehow reduce Flink's internal buffer sizes, so checkpointing with backpressure becomes faster?

- Rune

---

Our current setup - (improvement suggestions welome!):

Flink 1.2.0,  yarn@AWS EMR, 1 master + 3 slaves, m4.xlarge

program_parallelism: 12
taskmanagers: 6
slotsPerTaskManager: 4
taskmanager_heap_mb: 4096
jobmanager_heap_mb: 1024

Basic program structure:

1) read batch from Kinesis

2) Split batch and shuffle using custom partitioner (consistent hashing).

3) enrich using external REST service

4) Write to database (This step is the bottleneck)

On 24-04-2017 09:32, Yassine MARZOUGUI wrote:
Im sorry guys if you received multiple instances of this mail, I kept trying to send it yesterday, but looks like the mailing list was stuck and didn't dispatch it until now. Sorry for the disturb. On Apr 23, 2017 20:53, "Yassine MARZOUGUI" <y.marzou...@mindlytix.com <mailto:y.marzou...@mindlytix.com>> wrote:

    Hi all,
    I have a Streaming pipeline as follows:
    1 - read a folder continuousely from HDFS
    2 - filter duplicates (using keyby(x->x) and keeping a state per
    key indicating whether its is seen)
    3 - schedule some future actions on the stream using
    ProcessFunction and processing time timers (elements are kept in a
    MapState)
    4- write results back to HDFS using a BucketingSink.
    I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit:
    9fb074c).
    Currenlty the source contain just one a file of 1GB, so that's the
    maximum state that the job might hold. I noticed that the
    backpressure on the operators #1 and #2 is High, and the split
    reader has only read 60 Mb out of 1Gb source source file. I
    suspect this is because the ProcessFunction is slow (on purpose).
    However looks like this affected the checkpoints which are failing
    after the timeout (which is set to 2 hours), see attached screenshot.
    ​
    In the job manager logs I keep getting warnings :

    2017-04-23 19:32:38,827 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late 
message for now expired checkpoint attempt 8 from 
210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86.

    Is the high backpressure the cause for the checkpoints being too
    slow? If yes Is there a way to disbale the backpressure mechanism
    since the records will be buffered in the rocksdb state after all
    which is backed by the disk?
    Thank you.
    Best,
    Yassine

--

Venlig hilsen/Best regards *Rune Skou Larsen*

goto Trifork Public A/S Dyssen 1 · DK-8200 Aarhus N · Denmark Phone +45 3160 2497 Skype: rsltrifork Twitter: RuneSkouLarsen

Reply via email to