A SourceFunction may only emit records when it holds the checkpointLock (just as `ContinuousFileMonitoringFunction` does). Flink only emits a checkpoint if it holds the lock. This ensures correct behavior.
Best, Fabian 2017-02-28 10:58 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>: > Hi Fabian, > > I have a related question regarding throttling at the source: If there is > a sleep in the source as in ContinuousFileMonitoringFunction.java > <https://github.com/ymarzougui/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java#L198> > : > > while (isRunning) { > synchronized (checkpointLock) { > monitorDirAndForwardSplits(fileSystem, context); > } > Thread.sleep(interval); > } > > Does it also block checkpoints? > Thanks. > > Best, > Yassine > > 2017-02-28 10:39 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: > >> Hi Giuliano, >> >> Flink 1.2 introduced the AsyncFunction which asynchronously sends >> requests to external systems (k-v-stores, web services, etc.). >> You can limit the number of concurrent requests, but AFAIK you cannot >> specify a limit of requests per minute. >> Maybe you can configure the function such that it works for your use case. >> >> Alternatively, you can take it as a blueprint for a custom operator >> because handles watermarks and checkpoints correctly. >> >> I am not aware of a built-in mechanism to throttle a stream. You can do >> it manually and simply sleep() in a MapFunction but that will also block >> checkpoints. >> >> Best, Fabian >> >> 2017-02-28 3:19 GMT+01:00 Giuliano Caliari <giuliano.cali...@gmail.com>: >> >>> Hello, >>> >>> I have an interesting problem that I'm having a hard time modeling on >>> Flink, >>> I'm not sure if it's the right tool for the job. >>> >>> I have a stream of messages in Kafka that I need to group and send them >>> to >>> an external web service but I have some concerns that need to be >>> addressed: >>> >>> 1. Rate Limited requests => Only tens of requests per minute. If the >>> limit >>> is exceeded the system has to stop making requests for a few minutes. >>> 2. Crash handling => I'm using savepoints >>> >>> My first (naive) solution was to implement on a Sink function but the >>> requests may take a long time to return (up to minutes) so blocking the >>> thread will interfere with the savepoint mechanism (see here >>> <http://apache-flink-user-mailing-list-archive.2336050.n4.na >>> bble.com/Rate-limit-processing-td11174.html> >>> ). Because of this implementing the limit on the sink and relying on >>> backpressure to slow down the flow will get in the way of savepointing. >>> I'm >>> not sure how big of a problem this will be but on my tests I'm reading >>> thousands of messages before the backpressure mechanism starts and >>> savepointing is taking around 20 minutes. >>> >>> My second implementation was sleeping on the Fetcher for the Kafka >>> Consumer >>> but the ws requests time have a huge variance so I ended up implementing >>> a >>> communication channel between the sink and the source - an object with >>> mutable state. Not great. >>> >>> So my question is if there is a nice way to limit the flow of messages on >>> the system according to the rate given by a sink function? Is there any >>> other way I could make this work on Flink? >>> >>> Thank you >>> >>> >>> >>> -- >>> View this message in context: http://apache-flink-user-maili >>> ng-list-archive.2336050.n4.nabble.com/Flink-requesting-exter >>> nal-web-service-with-rate-limited-requests-tp11952.html >>> Sent from the Apache Flink User Mailing List archive. mailing list >>> archive at Nabble.com. >>> >> >> >