Hi, Thanks for the response again :)
Re 1). Do you mean that this extra burst external I/O network traffic is causing disturbance with other systems reading/writing from Kafka? With Kafka itself? Re 2a) Yes, it should be relatively simple, however any new brick makes the overall component more and more complicated, which has long term consequences in maintenance/refactoring/adding new features/just making reading the code more difficult etc. Re 2b) With setup of: WindowOperator -> RateLimitingOperator(maxSize = 0) -> Sink RateLimitingOperator would just slow down data processing via standard back pressure mechanism. Flink by default allocates 10% of the memory to Network buffers we could partially relay on them to buffer some smaller bursts, without blocking whole pipeline altogether. Essentially RateLimitingOperator(maxSize = 0) would cause back pressure and slow down record emission from the WindowOperator. So yes, there would be still batch emission of the data in the WindowOperator itself, but it would be prolonged/slowed down in terms of wall time because of down stream back pressure caused by RateLimitingOperator. Btw, with your proposal, with what event time do you want to emit the delayed data? If the event time of the produced records changes based on using/not using windows offsets, this can cause quite a lot of semantic problems and side effects for the downstream operators. Piotrek > On 28 Sep 2018, at 15:18, Rong Rong <walter...@gmail.com> wrote: > > Hi Piotrek, > > Thanks for getting back to me so quickly. Let me explain. > > Re 1). As I explained in the doc. we are using a basic Kafka-in Kafka-out > system with same partition number on both side. It is causing degraded > performance in external I/O network traffic. > It is definitely possible to configure more resource (e.g. larger partition > count) for output to handle the burst but it can also be resolved through > some sort of smoothing through internal (either through rate limiting as > you suggested, or through the dynamic offset). > > Re 2a). Yes I agree and I think I understand your concern. However it is > one simple API addition with default fallbacks that are fully > backward-compatible (or I think it be made fully compatible if I missed and > corner cases). > Re 2b). Yes. there could be many potential issues that causes data burst. > However, putting aside the scenarios that was caused by the nature of the > stream (data skew, bursts) that both affects input and output. We want to > address specifically the case that a smooth input is *deterministically* > resulting in burst output. What we are proposing here is kind of exactly > like the case of users' customer operator. However we can't do so unless > there's an API to control the offset. > > Regarding the problem of rate limiting and skew. I think I missed one key > point from you. I think you are right. If we introduce a *new rate limiting > operator *(with size > 0) it will > - causes extra state usage within the container (moving all the > components from window operator and store in rate limit buffer at window > boundaries). > - will not cause data skew problem: The data skew problem I mentioned is > that, if data are buffered in window operator state longer for some data > but not the other. Then potentially some panes will handle more late > arrival than others. > > However if it is possible to get rid of the extra memory usage we will > definitely benchmark the rate-limit approach. Can you be more specific on > how setting the rate-limit operator (size = 0) can resolve the burst issue? > If I understand correctly the backpressure will cause the watermark to not > advance, but once it crosses the window boundary, there will still be a > batch of messages emitting out of the window operator at the same time, > correct? > > Thanks, > Rong > > > > On Fri, Sep 28, 2018 at 1:25 AM Piotr Nowojski <pi...@data-artisans.com> > wrote: > >> Hi, >> >> Re 1. Can you be more specific? What system are you using, what’s >> happening and how does it brake? >> >> While delaying windows firing is probably the most cost effective solution >> for this particular problem, it has some disadvantages: >> a) putting even more logic to already complicated component >> b) not solving potential similar problems. I can easily imagine the same >> issue happening to other scenarios then "interval based operators” such as: >> - input sources faster then output sinks >> - data skew >> - data bursts >> - users' custom operators causing data bursts >> - users’ custom operators being prone to bursts (maybe something >> like AsyncOperator or something else that works with an external system) - >> so the problem might not necessarily be limited to the sinks >> >> As far as I recall, there were some users reporting some similar issues. >> >> Regarding potential drawbacks of rate limiting, I didn’t understand this >> part: >> >>> However the problem is similar to delay triggers which can provide >> degraded performance for skew sensitive downstream service, such as feeding >> feature extraction results to deep learning model. >> >> >> The way how I could imagine RateLimitingOperator is that it could take a >> parameters: rate limits, buffer size limit. >> >> With buffer size = 0, it would cause immediately a back pressure if rate >> is exceeded >> With buffer size > 0, ti would first buffer events on the state and only >> when reaching max buffer size, causing the back pressure >> >> For the case with WindowOperator, if windows are evicted and removed from >> the state, using buffer size > 0, wouldn’t cause increased state usage, it >> would only move the state from the WindowOperator to the >> RateLimitingOperator. >> >> Piotrek >> >>> On 27 Sep 2018, at 17:28, Rong Rong <walter...@gmail.com> wrote: >>> >>> HI Piotrek, >>> >>> Yes, to be more clear, >>> 1) the network I/O issue I am referring to is in between Flink and >> external >>> sink. We did not see issues in between operators. >>> 2) yes we've considered rate limiting sink functions as well which is >> also >>> mentioned in the doc. along with some of the the pro-con we identified. >>> >>> This kind of problem seems to only occur in WindowOperator so far, but >> yes >>> it can probably occur to any aligned interval based operator. >>> >>> -- >>> Rong >>> >>> On Wed, Sep 26, 2018 at 11:44 PM Piotr Nowojski <pi...@data-artisans.com >>> >>> wrote: >>> >>>> Hi, >>>> >>>> Thanks for the proposal. Could you provide more >>>> background/explanation/motivation why do you need such feature? What do >> you >>>> mean by “network I/O” degradation? >>>> >>>> On it’s own burst writes shouldn’t cause problems within Flink. If they >>>> do, we might want to fix the original underlying problem and if they are >>>> causing problems in external systems, we also might think about other >>>> approaches to fix/handle the problem (write rate limiting?), which >> might be >>>> more general and not fixing only bursts originating from WindowOperator. >>>> I’m not saying that your proposal is bad or anything, but I would just >> like >>>> to have more context :) >>>> >>>> Piotrek. >>>> >>>>> On 26 Sep 2018, at 19:21, Rong Rong <walter...@gmail.com> wrote: >>>>> >>>>> Hi Dev, >>>>> >>>>> I was wondering if there's any previous discussion regarding how to >>>> handle >>>>> burst network I/O when deploying Flink applications with window >>>> operators. >>>>> >>>>> We've recently see some significant network I/O degradation when trying >>>> to >>>>> use sliding window to perform rolling aggregations. The pattern is very >>>>> periodic: output connections get no traffic for a period of time until >> a >>>>> burst at window boundaries (in our case every 5 minutes). >>>>> >>>>> We have drafted a doc >>>>> < >>>> >> https://docs.google.com/document/d/1fEhbcRgxxX8zFYD_iMBG1DCbHmTcTRfRQFXelPhMFiY/edit?usp=sharing >>>>> >>>>> on >>>>> how we proposed to handle it to smooth the output traffic spikes. >> Please >>>>> kindly take a look, any comments and suggestions are highly >> appreciated. >>>>> >>>>> -- >>>>> Rong >>>> >>>> >> >>