Absolutely. But without a a view into a global WM at the source level, the --> would require sources to wait for sources that are "slower in time" --> is not possible for folks creating custom sources on extending existing ones. I would have loved to use a more scientific/data drive approach to slow down the aggressive consumers but without an fundamental API change where sources know the current global WM it is not possible . Further resources are at a premium as in if I can and should be able to execute should rqs, a replay or a real time should not show differing characteristics IMHO.
The more I work with Flink, the more I realize that the crux of Flink is WM generation ( if we are doing Event Processing ) and I would want the current WM at an Operator to be exposed at all places feasible. I have another email thread on why WM driven preemptive analysis of an ordered list of Events within a Window is desirable but could not do it using the natural API ( Accumulator based ) b'coz of paucity of WM view. Anywaz it does work as on now but I know it is an interim solution. Regards Vishal On Wed, Jan 3, 2018 at 9:55 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > > I think your analysis is very thorough and accurate. However, I don't > think that https://issues.apache.org/jira/browse/FLINK-7282 will solve > this problem. We're dealing with "time back-pressure" here and not > traditional processing back-pressure. (Not sure if there's a term for this > so I coined a new one... 😉). The problem is that some Kafka consumers > progress faster in event-time than others which leads to windows being > spawned at the window operator that only trigger (and get deleted) once all > sources catch up to that point in event time. > > Simply slowing down all reading mitigates this somewhat but a proper > solution would require sources to wait for sources that are "slower in > time". > > Does that make sense to you? > > Best, > Aljoscha > > > On 3. Jan 2018, at 15:45, Vishal Santoshi <vishal.santo...@gmail.com> > wrote: > > To add and an interim solution to the issue. > > I extended the based on the advise "custom source/adapt an existing > source" and put in a RateLimiter ( guava ) that effectively put a cap on > each kafka consumer ( x times the expected incident rqs ). That solved the > issue as in it stabilized the flow into down stream window operation ( I > use ProcessFunction for sessionizing and why is another discussion ) . > > This leads me to these conclusions and either of them could be correct > > > * The kakfa partitions are on different brokers ( the leaders ) and based > on the how efficient the broker is ( whether it has data in OS Cache or > whether there is a skew in leader distribution and thus more stress on a n > of m brokers) the consumption speed can vary. > * The skew caused by number of consumers to number of flink nodes ( if no. > of partitions % no of flink nodes == 0 there is no skew ) the consumption > rate can vary. > * Some TM nodes may be sluggish. > > Either ways any of the above reasons can cause data to be consumed at > different speeds which could lead to an imbalance and b'coz of the paucity > of fine grained back pressure handling leads to more windows that remain > open, windows that reflect the more aggressive consumption ( and thus more > variance from the current WM) than prudent, causing the pathological case > described above. By regulating the consumption rate ( I put the delimiter > in the extractTimestamp method ) , it effectively caused the more > aggressive consumptions to a fixed upper bound, making the rate of > consumption across consumers effectively similar. > > Either ways it seems imperative that https://issues.apache. > org/jira/browse/FLINK-7282 should be finalized at the earliest. The > consequences on a shared flink cluster are too huge IMHO. > > Please tell me if my conclusions are problematic or do not make sense. > > > Regards > > Vishal > > > > > > > On Tue, Jan 2, 2018 at 3:04 PM, Vishal Santoshi <vishal.santo...@gmail.com > > wrote: > >> Also note that if I were to start 2 pipelines >> >> 1. Working off the head of the topic and thus not prone to the >> pathological case described above >> 2. Doing a replay and thus prone to the pathological case described above >> >> Than the 2nd pipe will stall the 1st pipeline. This seems to to point to >> >> - All channels multiplexed into the same TCP connection stall >> together, as soon as one channel has backpressure. >> >> >> of the jira issue. This has to be a priority IMHO, in a shared VM where >> jobs should have at least some isolation. >> >> On Tue, Jan 2, 2018 at 2:19 PM, Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> Thank you. >>> >>> On Tue, Jan 2, 2018 at 1:31 PM, Nico Kruber <n...@data-artisans.com> >>> wrote: >>> >>>> Hi Vishal, >>>> let me already point you towards the JIRA issue for the credit-based >>>> flow control: https://issues.apache.org/jira/browse/FLINK-7282 >>>> >>>> I'll have a look at the rest of this email thread tomorrow... >>>> >>>> >>>> Regards, >>>> Nico >>>> >>>> On 02/01/18 17:52, Vishal Santoshi wrote: >>>> > Could you please point me to any documentation on the "credit-based >>>> > flow control" approach.... >>>> > >>>> > On Tue, Jan 2, 2018 at 10:35 AM, Timo Walther <twal...@apache.org >>>> > <mailto:twal...@apache.org>> wrote: >>>> > >>>> > Hi Vishal, >>>> > >>>> > your assumptions sound reasonable to me. The community is >>>> currently >>>> > working on a more fine-grained back pressuring with credit-based >>>> > flow control. It is on the roamap for 1.5 [1]/[2]. I will loop in >>>> > Nico that might tell you more about the details. Until then I >>>> guess >>>> > you have to implement a custom source/adapt an existing source to >>>> > let the data flow in more realistic. >>>> > >>>> > Regards, >>>> > Timo >>>> > >>>> > [1] >>>> > http://flink.apache.org/news/2017/11/22/release-1.4-and-1.5 >>>> -timeline.html >>>> > <http://flink.apache.org/news/2017/11/22/release-1.4-and-1. >>>> 5-timeline.html> >>>> > [2] https://www.youtube.com/watch?v=scStdhz9FHc >>>> > <https://www.youtube.com/watch?v=scStdhz9FHc> >>>> > >>>> > >>>> > Am 1/2/18 um 4:02 PM schrieb Vishal Santoshi: >>>> > >>>> > I did a simulation on session windows ( in 2 modes ) and let >>>> it >>>> > rip for about 12 hours >>>> > >>>> > 1. Replay where a kafka topic with retention of 7 days was the >>>> > source ( earliest ) >>>> > 2. Start the pipe with kafka source ( latest ) >>>> > >>>> > I saw results that differed dramatically. >>>> > >>>> > On replay the pipeline stalled after good ramp up while in >>>> the >>>> > second case the pipeline hummed on without issues. For the >>>> same >>>> > time period the data consumed is significantly more in the >>>> > second case with the WM progression stalled in the first case >>>> > with no hint of resolution ( the incoming data on source topic >>>> > far outstrips the WM progression ) I think I know the reasons >>>> > and this is my hypothesis. >>>> > >>>> > In replay mode the number of windows open do not have an upper >>>> > bound. While buffer exhaustion ( and data in flight with >>>> > watermark ) is the reason for throttle, it does not really >>>> > limit the open windows and in fact creates windows that >>>> reflect >>>> > futuristic data ( future is relative to the current WM ) . So >>>> if >>>> > partition x has data for watermark time t(x) and partition y >>>> for >>>> > watermark time t(y) and t(x) << t(y) where the overall >>>> watermark >>>> > is t(x) nothing significantly throttles consumption from the y >>>> > partition ( in fact for x too ) , the bounded buffer based >>>> > approach does not give minute control AFAIK as one would hope >>>> > and that implies there are far more open windows than the >>>> system >>>> > can handle and that leads to the pathological case where the >>>> > buffers fill up ( I believe that happens way late ) and >>>> > throttling occurs but the WM does not proceed and windows that >>>> > could ease the glut the throttling cannot proceed..... In the >>>> > replay mode the amount of data implies that the Fetchers keep >>>> > pulling data at the maximum consumption allowed by the open >>>> > ended buffer approach. >>>> > >>>> > My question thus is, is there any way to have a finer control >>>> of >>>> > back pressure, where in the consumption from a source is >>>> > throttled preemptively ( by for example decreasing the buffers >>>> > associated for a pipe or the size allocated ) or sleeps in the >>>> > Fetcher code that can help aligning the performance to have >>>> real >>>> > time consumption characteristics >>>> > >>>> > Regards, >>>> > >>>> > Vishal. >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> >>>> >>> >> > >