Hi Piotrek, Thanks for the feedback and reviews. Yes, as I explained previously in reply to the (2B) point. I think it is possible to create our own customized window assigner without any API change if we eliminate the requirement of
*"the same key should always results in the same offset"* I have updated the document to reflect such point. Probably you could give some more insights regarding this particular question. Much appreciated the feedback and efforts :-) -- Rong On Tue, Oct 9, 2018 at 12:15 AM Piotr Nowojski <pi...@data-artisans.com> wrote: > Hi, > > Sorry for getting back so late and thanks for the improved document :) I > think now I got your idea. > > You are now trying (or have you already done it?) to implement a custom > window assigner, that would work as in the [Figure 3] from your document? > > I think that indeed should be possible and relatively easy to do without > the need for API changes. > > Piotrek > > On 1 Oct 2018, at 17:48, Rong Rong <walter...@gmail.com> wrote: > > Hi Piotrek, > > Thanks for the quick response. To follow up with the questions: > Re 1). Yes it is causing network I/O issues on Kafka itself. > > Re 2a). Actually. I thought about it last weekend and I think there's a way > for a work around: We directly duplicated the key extraction logic in our > window assigner. Since the element record is passed in, it should be OK to > create a customized window assigner to handle offset-based on key by > extracting the key from record > This was the main part of my change: to let WindowAssignerContext to > provide current key information extracted from KeyedStateBackend. > > Re 2b). Thanks for the explanation, we will try to profile it! We've seems > some weird behaviors previously when loading up the network buffer in > Flink, although it's very rare and inconsistent when trying to reproduce. > > Re 3) Regarding the event time offset. I think I might have not explain my > idea clearly. I added some more details to the doc. Please kindly take a > look. > In a nutshell, window offsets does not change the event time of records at > all. We simply changes how window assigner assigns records to windows with > various different offsets. > > -- > Rong > > On Fri, Sep 28, 2018 at 8:03 AM Piotr Nowojski <pi...@data-artisans.com> > wrote: > > Hi, > > Thanks for the response again :) > > Re 1). Do you mean that this extra burst external I/O network traffic is > causing disturbance with other systems reading/writing from Kafka? With > Kafka itself? > > Re 2a) Yes, it should be relatively simple, however any new brick makes > the overall component more and more complicated, which has long term > consequences in maintenance/refactoring/adding new features/just making > reading the code more difficult etc. > > Re 2b) With setup of: > > WindowOperator -> RateLimitingOperator(maxSize = 0) -> Sink > > RateLimitingOperator would just slow down data processing via standard > back pressure mechanism. Flink by default allocates 10% of the memory to > Network buffers we could partially relay on them to buffer some smaller > bursts, without blocking whole pipeline altogether. Essentially > RateLimitingOperator(maxSize = 0) would cause back pressure and slow down > record emission from the WindowOperator. So yes, there would be still batch > emission of the data in the WindowOperator itself, but it would be > prolonged/slowed down in terms of wall time because of down stream back > pressure caused by RateLimitingOperator. > > Btw, with your proposal, with what event time do you want to emit the > delayed data? If the event time of the produced records changes based on > using/not using windows offsets, this can cause quite a lot of semantic > problems and side effects for the downstream operators. > > Piotrek > > On 28 Sep 2018, at 15:18, Rong Rong <walter...@gmail.com> wrote: > > Hi Piotrek, > > Thanks for getting back to me so quickly. Let me explain. > > Re 1). As I explained in the doc. we are using a basic Kafka-in Kafka-out > system with same partition number on both side. It is causing degraded > performance in external I/O network traffic. > It is definitely possible to configure more resource (e.g. larger > > partition > > count) for output to handle the burst but it can also be resolved through > some sort of smoothing through internal (either through rate limiting as > you suggested, or through the dynamic offset). > > Re 2a). Yes I agree and I think I understand your concern. However it is > one simple API addition with default fallbacks that are fully > backward-compatible (or I think it be made fully compatible if I missed > > and > > corner cases). > Re 2b). Yes. there could be many potential issues that causes data burst. > However, putting aside the scenarios that was caused by the nature of the > stream (data skew, bursts) that both affects input and output. We want to > address specifically the case that a smooth input is *deterministically* > resulting in burst output. What we are proposing here is kind of exactly > like the case of users' customer operator. However we can't do so unless > there's an API to control the offset. > > Regarding the problem of rate limiting and skew. I think I missed one key > point from you. I think you are right. If we introduce a *new rate > > limiting > > operator *(with size > 0) it will > - causes extra state usage within the container (moving all the > components from window operator and store in rate limit buffer at window > boundaries). > - will not cause data skew problem: The data skew problem I mentioned is > that, if data are buffered in window operator state longer for some data > but not the other. Then potentially some panes will handle more late > arrival than others. > > However if it is possible to get rid of the extra memory usage we will > definitely benchmark the rate-limit approach. Can you be more specific on > how setting the rate-limit operator (size = 0) can resolve the burst > > issue? > > If I understand correctly the backpressure will cause the watermark to > > not > > advance, but once it crosses the window boundary, there will still be a > batch of messages emitting out of the window operator at the same time, > correct? > > Thanks, > Rong > > > > On Fri, Sep 28, 2018 at 1:25 AM Piotr Nowojski <pi...@data-artisans.com> > wrote: > > Hi, > > Re 1. Can you be more specific? What system are you using, what’s > happening and how does it brake? > > While delaying windows firing is probably the most cost effective > > solution > > for this particular problem, it has some disadvantages: > a) putting even more logic to already complicated component > b) not solving potential similar problems. I can easily imagine the same > issue happening to other scenarios then "interval based operators” such > > as: > > - input sources faster then output sinks > - data skew > - data bursts > - users' custom operators causing data bursts > - users’ custom operators being prone to bursts (maybe something > like AsyncOperator or something else that works with an external > > system) - > > so the problem might not necessarily be limited to the sinks > > As far as I recall, there were some users reporting some similar issues. > > Regarding potential drawbacks of rate limiting, I didn’t understand this > part: > > However the problem is similar to delay triggers which can provide > > degraded performance for skew sensitive downstream service, such as > > feeding > > feature extraction results to deep learning model. > > > The way how I could imagine RateLimitingOperator is that it could take a > parameters: rate limits, buffer size limit. > > With buffer size = 0, it would cause immediately a back pressure if rate > is exceeded > With buffer size > 0, ti would first buffer events on the state and only > when reaching max buffer size, causing the back pressure > > For the case with WindowOperator, if windows are evicted and removed > > from > > the state, using buffer size > 0, wouldn’t cause increased state usage, > > it > > would only move the state from the WindowOperator to the > RateLimitingOperator. > > Piotrek > > On 27 Sep 2018, at 17:28, Rong Rong <walter...@gmail.com> wrote: > > HI Piotrek, > > Yes, to be more clear, > 1) the network I/O issue I am referring to is in between Flink and > > external > > sink. We did not see issues in between operators. > 2) yes we've considered rate limiting sink functions as well which is > > also > > mentioned in the doc. along with some of the the pro-con we identified. > > This kind of problem seems to only occur in WindowOperator so far, but > > yes > > it can probably occur to any aligned interval based operator. > > -- > Rong > > On Wed, Sep 26, 2018 at 11:44 PM Piotr Nowojski < > > pi...@data-artisans.com > > > wrote: > > Hi, > > Thanks for the proposal. Could you provide more > background/explanation/motivation why do you need such feature? What > > do > > you > > mean by “network I/O” degradation? > > On it’s own burst writes shouldn’t cause problems within Flink. If > > they > > do, we might want to fix the original underlying problem and if they > > are > > causing problems in external systems, we also might think about other > approaches to fix/handle the problem (write rate limiting?), which > > might be > > more general and not fixing only bursts originating from > > WindowOperator. > > I’m not saying that your proposal is bad or anything, but I would just > > like > > to have more context :) > > Piotrek. > > On 26 Sep 2018, at 19:21, Rong Rong <walter...@gmail.com> wrote: > > Hi Dev, > > I was wondering if there's any previous discussion regarding how to > > handle > > burst network I/O when deploying Flink applications with window > > operators. > > > We've recently see some significant network I/O degradation when > > trying > > to > > use sliding window to perform rolling aggregations. The pattern is > > very > > periodic: output connections get no traffic for a period of time > > until > > a > > burst at window boundaries (in our case every 5 minutes). > > We have drafted a doc > < > > > > > https://docs.google.com/document/d/1fEhbcRgxxX8zFYD_iMBG1DCbHmTcTRfRQFXelPhMFiY/edit?usp=sharing > > > on > how we proposed to handle it to smooth the output traffic spikes. > > Please > > kindly take a look, any comments and suggestions are highly > > appreciated. > > > -- > Rong > > > > > > > > >