Hi Ufuk, The ProcessFunction receives elements and buffers them into a MapState, and periodically (for example every x seconds) register processing time timers (according to some rules which it gets from a connected rule stream). When a timer fires, I pop next element from state, request an external server, and collect the response. The requests to the external server should happen periodically and not continuousely, that's why I control them using timers, and buffer elements in the RocksdbState.
2017-04-24 13:48 GMT+02:00 Ufuk Celebi <u...@apache.org>: > @Yessine: no, there is no way to disable the back pressure mechanism. Do > you have more details about the two last operators? What do you mean with > the process function is slow on purpose? > > @Rune: with 1.3 Flink will configure the internal buffers in a way that > not too much data is buffered in the internal buffers ( > https://issues.apache.org/jira/browse/FLINK-4545). You could try the > current master and check whether it improves the checkpointing behaviour > under back pressure. Out of curiosity, are you using the async I/O API for > the communication with the external REST service (https://ci.apache.org/ > projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html)? > > – Ufuk > > > On Mon, Apr 24, 2017 at 11:08 AM, Rune Skou Larsen <r...@trifork.com> > wrote: > >> Sorry I cant help you, but we're also experiencing slow checkpointing, >> when having backpressure from sink. >> >> I tried HDFS, S3, and RocksDB state backends, but to no avail - >> checkpointing always times out with backpressure. >> >> Can we somehow reduce Flink's internal buffer sizes, so checkpointing >> with backpressure becomes faster? >> >> - Rune >> >> --- >> >> Our current setup - (improvement suggestions welome!): >> >> Flink 1.2.0, yarn@AWS EMR, 1 master + 3 slaves, m4.xlarge >> >> program_parallelism: 12taskmanagers: 6slotsPerTaskManager: >> 4taskmanager_heap_mb: 4096jobmanager_heap_mb: 1024 >> >> Basic program structure: >> >> 1) read batch from Kinesis >> >> 2) Split batch and shuffle using custom partitioner (consistent hashing). >> >> 3) enrich using external REST service >> >> 4) Write to database (This step is the bottleneck) >> On 24-04-2017 09:32, Yassine MARZOUGUI wrote: >> >> Im sorry guys if you received multiple instances of this mail, I kept >> trying to send it yesterday, but looks like the mailing list was stuck and >> didn't dispatch it until now. Sorry for the disturb. >> On Apr 23, 2017 20:53, "Yassine MARZOUGUI" <y.marzou...@mindlytix.com> >> wrote: >>> >>> Hi all, >>> I have a Streaming pipeline as follows: >>> 1 - read a folder continuousely from HDFS >>> 2 - filter duplicates (using keyby(x->x) and keeping a state per key >>> indicating whether its is seen) >>> 3 - schedule some future actions on the stream using ProcessFunction and >>> processing time timers (elements are kept in a MapState) >>> 4- write results back to HDFS using a BucketingSink. >>> I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c). >>> Currenlty the source contain just one a file of 1GB, so that's the >>> maximum state that the job might hold. I noticed that the backpressure on >>> the operators #1 and #2 is High, and the split reader has only read 60 Mb >>> out of 1Gb source source file. I suspect this is because the >>> ProcessFunction is slow (on purpose). However looks like this affected the >>> checkpoints which are failing after the timeout (which is set to 2 hours), >>> see attached screenshot. >>> >>> In the job manager logs I keep getting warnings : >>> >>> 2017-04-23 19:32:38,827 WARN >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received >>> late message for now expired checkpoint attempt 8 from >>> 210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86. >>> >>> Is the high backpressure the cause for the checkpoints being too slow? >>> If yes Is there a way to disbale the backpressure mechanism since the >>> records will be buffered in the rocksdb state after all which is backed by >>> the disk? >>> Thank you. >>> Best, >>> Yassine >>> >> -- >> >> Venlig hilsen/Best regards *Rune Skou Larsen* >> >> [image: goto] Trifork Public A/S Dyssen 1 · DK-8200 Aarhus N · Denmark >> Phone +45 3160 2497 <+45%2031%2060%2024%2097> Skype: rsltrifork Twitter: >> RuneSkouLarsen >> > >