Hi Ufuk,

The ProcessFunction receives elements and buffers them into a MapState, and
periodically (for example every x seconds) register processing time timers
(according to some rules which it gets from a connected rule stream). When
a timer fires, I pop next element from state, request an external server,
and collect the response.
The requests to the external server should happen periodically and not
continuousely, that's why I control them using timers, and buffer elements
in the RocksdbState.

2017-04-24 13:48 GMT+02:00 Ufuk Celebi <u...@apache.org>:

> @Yessine: no, there is no way to disable the back pressure mechanism. Do
> you have more details about the two last operators? What do you mean with
> the process function is slow on purpose?
>
> @Rune: with 1.3 Flink will configure the internal buffers in a way that
> not too much data is buffered in the internal buffers (
> https://issues.apache.org/jira/browse/FLINK-4545). You could try the
> current master and check whether it improves the checkpointing behaviour
> under back pressure. Out of curiosity, are you using the async I/O API for
> the communication with the external REST service (https://ci.apache.org/
> projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html)?
>
> – Ufuk
>
>
> On Mon, Apr 24, 2017 at 11:08 AM, Rune Skou Larsen <r...@trifork.com>
> wrote:
>
>> Sorry I cant help you, but we're also experiencing slow checkpointing,
>> when having backpressure from sink.
>>
>> I tried HDFS, S3, and RocksDB state backends, but to no avail -
>> checkpointing always times out with backpressure.
>>
>> Can we somehow reduce Flink's internal buffer sizes, so checkpointing
>> with backpressure becomes faster?
>>
>> - Rune
>>
>> ---
>>
>> Our current setup - (improvement suggestions welome!):
>>
>> Flink 1.2.0,  yarn@AWS EMR, 1 master + 3 slaves, m4.xlarge
>>
>> program_parallelism: 12taskmanagers: 6slotsPerTaskManager: 
>> 4taskmanager_heap_mb: 4096jobmanager_heap_mb: 1024
>>
>> Basic program structure:
>>
>> 1) read batch from Kinesis
>>
>> 2) Split batch and shuffle using custom partitioner (consistent hashing).
>>
>> 3) enrich using external REST service
>>
>> 4) Write to database (This step is the bottleneck)
>> On 24-04-2017 09:32, Yassine MARZOUGUI wrote:
>>
>> Im sorry guys if you received multiple instances of this mail, I kept
>> trying to send it yesterday, but looks like the mailing list was stuck and
>> didn't dispatch it until now. Sorry for the disturb.
>> On Apr 23, 2017 20:53, "Yassine MARZOUGUI" <y.marzou...@mindlytix.com>
>> wrote:
>>>
>>> Hi all,
>>> I have a Streaming pipeline as follows:
>>> 1 - read a folder continuousely from HDFS
>>> 2 - filter duplicates (using keyby(x->x) and keeping a state per key
>>> indicating whether its is seen)
>>> 3 - schedule some future actions on the stream using ProcessFunction and
>>> processing time timers (elements are kept in a MapState)
>>> 4- write results back to HDFS using a BucketingSink.
>>> I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).
>>> Currenlty the source contain just one a file of 1GB, so that's the
>>> maximum state that the job might hold. I noticed that the backpressure on
>>> the operators #1 and #2 is High, and the split reader has only read 60 Mb
>>> out of 1Gb source source file. I suspect this is because the
>>> ProcessFunction is slow (on purpose). However looks like this affected the
>>> checkpoints which are failing after the timeout (which is set to 2 hours),
>>> see attached screenshot.
>>> ​
>>> In the job manager logs I keep getting warnings :
>>>
>>> 2017-04-23 19:32:38,827 WARN  
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received 
>>> late message for now expired checkpoint attempt 8 from 
>>> 210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86.
>>>
>>> Is the high backpressure the cause for the checkpoints being too slow?
>>> If yes Is there a way to disbale the backpressure mechanism since the
>>> records will be buffered in the rocksdb state after all which is backed by
>>> the disk?
>>> Thank you.
>>> Best,
>>> Yassine
>>>
>> --
>>
>> Venlig hilsen/Best regards *Rune Skou Larsen*
>>
>> [image: goto] Trifork Public A/S Dyssen 1 · DK-8200 Aarhus N · Denmark
>> Phone +45 3160 2497 <+45%2031%2060%2024%2097> Skype: rsltrifork Twitter:
>> RuneSkouLarsen
>>
>
>

Reply via email to