Absolutely.

       But without a a view into a global WM at the source level, the --> would
require sources to wait for sources that are "slower in time" -->  is not
possible for folks creating custom sources on extending existing ones. I
would have loved to use a more scientific/data drive approach to slow down
the aggressive consumers but   without an fundamental API change where
sources know the current global WM it is not possible . Further resources
are at a premium as in if I can and should be able to execute should rqs, a
replay or a real time should not show differing characteristics IMHO.

The more I work with Flink, the more I realize that the crux of Flink is WM
generation ( if we are doing Event Processing ) and I would want the
current WM at an Operator  to be exposed at all places feasible. I have
another email thread on why WM driven preemptive analysis of an ordered
list of Events within a Window is desirable but could not do it using the
natural API ( Accumulator based ) b'coz of paucity of WM view.

Anywaz it does work as on now but I know it is an interim solution.

Regards

Vishal

On Wed, Jan 3, 2018 at 9:55 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
>
> I think your analysis is very thorough and accurate. However, I don't
> think that https://issues.apache.org/jira/browse/FLINK-7282 will solve
> this problem. We're dealing with "time back-pressure" here and not
> traditional processing back-pressure. (Not sure if there's a term for this
> so I coined a new one... 😉). The problem is that some Kafka consumers
> progress faster in event-time than others which leads to windows being
> spawned at the window operator that only trigger (and get deleted) once all
> sources catch up to that point in event time.
>
> Simply slowing down all reading mitigates this somewhat but a proper
> solution would require sources to wait for sources that are "slower in
> time".
>
> Does that make sense to you?
>
> Best,
> Aljoscha
>
>
> On 3. Jan 2018, at 15:45, Vishal Santoshi <vishal.santo...@gmail.com>
> wrote:
>
> To add and an interim solution to the issue.
>
> I extended the based on the advise "custom source/adapt an existing
> source" and put in a RateLimiter ( guava ) that effectively put a cap on
> each kafka consumer  ( x times the expected incident rqs ). That solved the
> issue  as in it stabilized the flow into down stream window operation ( I
> use ProcessFunction for sessionizing and why is another discussion ) .
>
>  This leads me to these conclusions and either of them could be correct
>
>
> * The kakfa partitions are on different brokers ( the leaders ) and based
> on the how efficient the broker is ( whether it has data in OS Cache  or
> whether there is a skew in leader distribution and thus more stress on a n
> of m brokers) the consumption speed can vary.
> * The skew caused by number of consumers to number of flink nodes ( if no.
> of partitions % no of flink nodes == 0 there is no skew ) the consumption
> rate can vary.
> * Some TM nodes may be sluggish.
>
> Either ways any of the above reasons can cause data to be consumed at
> different speeds which could lead to an imbalance and b'coz of the paucity
> of fine grained back pressure handling leads to more windows that remain
> open, windows that reflect the more aggressive consumption ( and thus more
> variance from the current WM)  than prudent, causing the pathological case
> described above. By regulating the consumption rate ( I put the delimiter
> in the extractTimestamp method ) , it effectively caused the more
> aggressive consumptions to a fixed upper bound, making the rate of
> consumption across consumers effectively similar.
>
> Either ways it seems imperative that https://issues.apache.
> org/jira/browse/FLINK-7282 should be finalized at the earliest. The
> consequences on a shared flink cluster are too huge IMHO.
>
> Please tell me if my conclusions are problematic or do not make sense.
>
>
> Regards
>
> Vishal
>
>
>
>
>
>
> On Tue, Jan 2, 2018 at 3:04 PM, Vishal Santoshi <vishal.santo...@gmail.com
> > wrote:
>
>> Also note that if I were to start 2 pipelines
>>
>> 1. Working off the head of the topic and thus not prone to the
>> pathological case described above
>> 2. Doing a replay and thus prone to the  pathological case described above
>>
>> Than the 2nd pipe will stall the 1st pipeline. This seems to to point to
>>
>>    - All channels multiplexed into the same TCP connection stall
>>    together, as soon as one channel has backpressure.
>>
>>
>> of the jira issue. This has to be a priority IMHO, in a shared VM where
>> jobs should have at least some isolation.
>>
>> On Tue, Jan 2, 2018 at 2:19 PM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Thank you.
>>>
>>> On Tue, Jan 2, 2018 at 1:31 PM, Nico Kruber <n...@data-artisans.com>
>>> wrote:
>>>
>>>> Hi Vishal,
>>>> let me already point you towards the JIRA issue for the credit-based
>>>> flow control: https://issues.apache.org/jira/browse/FLINK-7282
>>>>
>>>> I'll have a look at the rest of this email thread tomorrow...
>>>>
>>>>
>>>> Regards,
>>>> Nico
>>>>
>>>> On 02/01/18 17:52, Vishal Santoshi wrote:
>>>> > Could you please point me to any documentation on the  "credit-based
>>>> > flow control" approach....
>>>> >
>>>> > On Tue, Jan 2, 2018 at 10:35 AM, Timo Walther <twal...@apache.org
>>>> > <mailto:twal...@apache.org>> wrote:
>>>> >
>>>> >     Hi Vishal,
>>>> >
>>>> >     your assumptions sound reasonable to me. The community is
>>>> currently
>>>> >     working on a more fine-grained back pressuring with credit-based
>>>> >     flow control. It is on the roamap for 1.5 [1]/[2]. I will loop in
>>>> >     Nico that might tell you more about the details. Until then I
>>>> guess
>>>> >     you have to implement a custom source/adapt an existing source to
>>>> >     let the data flow in more realistic.
>>>> >
>>>> >     Regards,
>>>> >     Timo
>>>> >
>>>> >     [1]
>>>> >     http://flink.apache.org/news/2017/11/22/release-1.4-and-1.5
>>>> -timeline.html
>>>> >     <http://flink.apache.org/news/2017/11/22/release-1.4-and-1.
>>>> 5-timeline.html>
>>>> >     [2] https://www.youtube.com/watch?v=scStdhz9FHc
>>>> >     <https://www.youtube.com/watch?v=scStdhz9FHc>
>>>> >
>>>> >
>>>> >     Am 1/2/18 um 4:02 PM schrieb Vishal Santoshi:
>>>> >
>>>> >         I did a simulation on session windows ( in 2 modes ) and let
>>>> it
>>>> >         rip for about 12 hours
>>>> >
>>>> >         1. Replay where a kafka topic with retention of 7 days was the
>>>> >         source ( earliest )
>>>> >         2. Start the pipe with kafka source ( latest )
>>>> >
>>>> >         I saw results that differed dramatically.
>>>> >
>>>> >         On replay the pipeline stalled after  good ramp up while in
>>>> the
>>>> >         second case the pipeline hummed on without issues. For the
>>>> same
>>>> >         time period the data consumed is significantly more in the
>>>> >         second case with the WM progression stalled in the first case
>>>> >         with no hint of resolution ( the incoming data on source topic
>>>> >         far outstrips the WM progression )  I think I know the reasons
>>>> >         and this is my hypothesis.
>>>> >
>>>> >         In replay mode the number of windows open do not have an upper
>>>> >         bound. While buffer exhaustion ( and data in flight with
>>>> >         watermark )  is the reason for throttle, it does not really
>>>> >         limit the open windows and in fact creates windows that
>>>> reflect
>>>> >         futuristic data ( future is relative to the current WM ) . So
>>>> if
>>>> >         partition x has data for watermark time t(x) and partition y
>>>> for
>>>> >         watermark time t(y) and t(x) << t(y) where the overall
>>>> watermark
>>>> >         is t(x) nothing significantly throttles consumption from the y
>>>> >         partition ( in fact for x too ) , the bounded buffer based
>>>> >         approach does not give minute control AFAIK as one would hope
>>>> >         and that implies there are far more open windows than the
>>>> system
>>>> >         can handle and that leads to the pathological case where the
>>>> >         buffers fill up  ( I believe that happens way late ) and
>>>> >         throttling occurs but the WM does not proceed and windows that
>>>> >         could ease the glut the throttling cannot proceed..... In the
>>>> >         replay mode the amount of data implies that the Fetchers keep
>>>> >         pulling data at the maximum consumption allowed by the open
>>>> >         ended buffer approach.
>>>> >
>>>> >         My question thus is, is there any way to have a finer control
>>>> of
>>>> >         back pressure, where in the consumption from a source is
>>>> >         throttled preemptively ( by for example decreasing the buffers
>>>> >         associated for a pipe or the size allocated ) or sleeps in the
>>>> >         Fetcher code that can help aligning the performance to have
>>>> real
>>>> >         time consumption  characteristics
>>>> >
>>>> >         Regards,
>>>> >
>>>> >         Vishal.
>>>> >
>>>> >
>>>> >
>>>> >
>>>> >
>>>> >
>>>>
>>>>
>>>
>>
>
>

Reply via email to