Hi Piotrek,

Thanks for the feedback and reviews.
Yes, as I explained previously in reply to the (2B) point. I think it is
possible to create our own customized window assigner without any API
change if we eliminate the requirement of

*"the same key should always results in the same offset"*

I have updated the document to reflect such point. Probably you could give
some more insights regarding this particular question.

Much appreciated the feedback and efforts :-)

--
Rong


On Tue, Oct 9, 2018 at 12:15 AM Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> Sorry for getting back so late and thanks for the improved document :) I
> think now I got your idea.
>
> You are now trying (or have you already done it?) to implement a custom
> window assigner, that would work as in the [Figure 3] from your document?
>
> I think that indeed should be possible and relatively easy to do without
> the need for API changes.
>
> Piotrek
>
> On 1 Oct 2018, at 17:48, Rong Rong <walter...@gmail.com> wrote:
>
> Hi Piotrek,
>
> Thanks for the quick response. To follow up with the questions:
> Re 1). Yes it is causing network I/O issues on Kafka itself.
>
> Re 2a). Actually. I thought about it last weekend and I think there's a way
> for a work around: We directly duplicated the key extraction logic in our
> window assigner. Since the element record is passed in, it should be OK to
> create a customized window assigner to handle offset-based on key by
> extracting the key from record
> This was the main part of my change: to let WindowAssignerContext to
> provide current key information extracted from KeyedStateBackend.
>
> Re 2b). Thanks for the explanation, we will try to profile it! We've seems
> some weird behaviors previously when loading up the network buffer in
> Flink, although it's very rare and inconsistent when trying to reproduce.
>
> Re 3) Regarding the event time offset. I think I might have not explain my
> idea clearly. I added some more details to the doc. Please kindly take a
> look.
> In a nutshell, window offsets does not change the event time of records at
> all. We simply changes how window assigner assigns records to windows with
> various different offsets.
>
> --
> Rong
>
> On Fri, Sep 28, 2018 at 8:03 AM Piotr Nowojski <pi...@data-artisans.com>
> wrote:
>
> Hi,
>
> Thanks for the response again :)
>
> Re 1). Do you mean that this extra burst external I/O network traffic is
> causing disturbance with other systems reading/writing from Kafka? With
> Kafka itself?
>
> Re 2a) Yes, it should be relatively simple, however any new brick makes
> the overall component more and more complicated, which has long term
> consequences in maintenance/refactoring/adding new features/just making
> reading the code more difficult etc.
>
> Re 2b) With setup of:
>
> WindowOperator -> RateLimitingOperator(maxSize = 0) -> Sink
>
> RateLimitingOperator would just slow down data processing via standard
> back pressure mechanism. Flink by default allocates 10% of the memory to
> Network buffers we could partially relay on them to buffer some smaller
> bursts, without blocking whole pipeline altogether. Essentially
> RateLimitingOperator(maxSize = 0) would cause back pressure and slow down
> record emission from the WindowOperator. So yes, there would be still batch
> emission of the data in the WindowOperator itself, but it would be
> prolonged/slowed down in terms of wall time because of down stream back
> pressure caused by RateLimitingOperator.
>
> Btw, with your proposal, with what event time do you want to emit the
> delayed data? If the event time of the produced records changes based on
> using/not using windows offsets, this can cause quite a lot of semantic
> problems and side effects for the downstream operators.
>
> Piotrek
>
> On 28 Sep 2018, at 15:18, Rong Rong <walter...@gmail.com> wrote:
>
> Hi Piotrek,
>
> Thanks for getting back to me so quickly. Let me explain.
>
> Re 1). As I explained in the doc. we are using a basic Kafka-in Kafka-out
> system with same partition number on both side. It is causing degraded
> performance in external I/O network traffic.
> It is definitely possible to configure more resource (e.g. larger
>
> partition
>
> count) for output to handle the burst but it can also be resolved through
> some sort of smoothing through internal (either through rate limiting as
> you suggested, or through the dynamic offset).
>
> Re 2a). Yes I agree and I think I understand your concern. However it is
> one simple API addition with default fallbacks that are fully
> backward-compatible (or I think it be made fully compatible if I missed
>
> and
>
> corner cases).
> Re 2b). Yes. there could be many potential issues that causes data burst.
> However, putting aside the scenarios that was caused by the nature of the
> stream (data skew, bursts) that both affects input and output. We want to
> address specifically the case that a smooth input is *deterministically*
> resulting in burst output. What we are proposing here is kind of exactly
> like the case of users' customer operator. However we can't do so unless
> there's an API to control the offset.
>
> Regarding the problem of rate limiting and skew. I think I missed one key
> point from you. I think you are right. If we introduce a *new rate
>
> limiting
>
> operator *(with size > 0) it will
> - causes extra state usage within the container (moving all the
> components from window operator and store in rate limit buffer at window
> boundaries).
> - will not cause data skew problem: The data skew problem I mentioned is
> that, if data are buffered in window operator state longer for some data
> but not the other. Then potentially some panes will handle more late
> arrival than others.
>
> However if it is possible to get rid of the extra memory usage we will
> definitely benchmark the rate-limit approach. Can you be more specific on
> how setting the rate-limit operator (size = 0) can resolve the burst
>
> issue?
>
> If I understand correctly the backpressure will cause the watermark to
>
> not
>
> advance, but once it crosses the window boundary, there will still be a
> batch of messages emitting out of the window operator at the same time,
> correct?
>
> Thanks,
> Rong
>
>
>
> On Fri, Sep 28, 2018 at 1:25 AM Piotr Nowojski <pi...@data-artisans.com>
> wrote:
>
> Hi,
>
> Re 1. Can you be more specific? What system are you using, what’s
> happening and how does it brake?
>
> While delaying windows firing is probably the most cost effective
>
> solution
>
> for this particular problem, it has some disadvantages:
> a) putting even more logic to already complicated component
> b) not solving potential similar problems. I can easily imagine the same
> issue happening to other scenarios then "interval based operators” such
>
> as:
>
>       - input sources faster then output sinks
>       - data skew
>       - data bursts
>       - users' custom operators causing data bursts
>       - users’ custom operators being prone to bursts (maybe something
> like AsyncOperator or something else that works with an external
>
> system) -
>
> so the problem might not necessarily be limited to the sinks
>
> As far as I recall, there were some users reporting some similar issues.
>
> Regarding potential drawbacks of rate limiting, I didn’t understand this
> part:
>
> However the problem is similar to delay triggers which can provide
>
> degraded performance for skew sensitive downstream service, such as
>
> feeding
>
> feature extraction results to deep learning model.
>
>
> The way how I could imagine RateLimitingOperator is that it could take a
> parameters: rate limits, buffer size limit.
>
> With buffer size = 0, it would cause immediately a back pressure if rate
> is exceeded
> With buffer size > 0, ti would first buffer events on the state and only
> when reaching max buffer size, causing the back pressure
>
> For the case with WindowOperator, if windows are evicted and removed
>
> from
>
> the state, using buffer size > 0, wouldn’t cause increased state usage,
>
> it
>
> would only move the state from the WindowOperator to the
> RateLimitingOperator.
>
> Piotrek
>
> On 27 Sep 2018, at 17:28, Rong Rong <walter...@gmail.com> wrote:
>
> HI Piotrek,
>
> Yes, to be more clear,
> 1) the network I/O issue I am referring to is in between Flink and
>
> external
>
> sink. We did not see issues in between operators.
> 2) yes we've considered rate limiting sink functions as well which is
>
> also
>
> mentioned in the doc. along with some of the the pro-con we identified.
>
> This kind of problem seems to only occur in WindowOperator so far, but
>
> yes
>
> it can probably occur to any aligned interval based operator.
>
> --
> Rong
>
> On Wed, Sep 26, 2018 at 11:44 PM Piotr Nowojski <
>
> pi...@data-artisans.com
>
>
> wrote:
>
> Hi,
>
> Thanks for the proposal. Could you provide more
> background/explanation/motivation why do you need such feature? What
>
> do
>
> you
>
> mean by “network I/O” degradation?
>
> On it’s own burst writes shouldn’t cause problems within Flink. If
>
> they
>
> do, we might want to fix the original underlying problem and if they
>
> are
>
> causing problems in external systems, we also might think about other
> approaches to fix/handle the problem (write rate limiting?), which
>
> might be
>
> more general and not fixing only bursts originating from
>
> WindowOperator.
>
> I’m not saying that your proposal is bad or anything, but I would just
>
> like
>
> to have more context :)
>
> Piotrek.
>
> On 26 Sep 2018, at 19:21, Rong Rong <walter...@gmail.com> wrote:
>
> Hi Dev,
>
> I was wondering if there's any previous discussion regarding how to
>
> handle
>
> burst network I/O when deploying Flink applications with window
>
> operators.
>
>
> We've recently see some significant network I/O degradation when
>
> trying
>
> to
>
> use sliding window to perform rolling aggregations. The pattern is
>
> very
>
> periodic: output connections get no traffic for a period of time
>
> until
>
> a
>
> burst at window boundaries (in our case every 5 minutes).
>
> We have drafted a doc
> <
>
>
>
>
> https://docs.google.com/document/d/1fEhbcRgxxX8zFYD_iMBG1DCbHmTcTRfRQFXelPhMFiY/edit?usp=sharing
>
>
> on
> how we proposed to handle it to smooth the output traffic spikes.
>
> Please
>
> kindly take a look, any comments and suggestions are highly
>
> appreciated.
>
>
> --
> Rong
>
>
>
>
>
>
>
>
>

Reply via email to