I think you could actually do a window operation to get the tDigestStream
from windowMetricsByIp:

windowMetricsByIp.allWindow(SameWindowAsTumblingTimeWindow).fold(...)

This way the watermark mechanism should ensure you get all partial results
before flushing the global window.

Gyula

William Saar <will...@saar.se> ezt írta (időpont: 2017. máj. 30., K, 15:03):

> > This logic now assumes that you get the TDigest result before getting
> any groupBy metric, which will probably not be the case so you could do
> some custom buffering in state. Depending on the rate of the stream this
> might or might not be feasible :)
>
> Unfortunately, I think this assumption is a deal-breaker. The value stream
> is not grouped, but I need to distribute the values to compute the metrics
> and I am populating the TDigest with the metrics
>
> Your suggestion gave me some ideas. Assume I have
> windowMetricsByIp =
> values.keyBy(ip).window(TumblingTimeWindow).fold(computeMetrics)
> tDigestStream = windowMetricsByIp.global().flatMap(tDigestMapper) // How
> do I know when the flat map has seen all values and should emit its result?
> percentilesStream =
> tDigestStream.broadcast().connect(windowMetricsByIp).flatMap
>
> If I attach information about the current window to the metrics events on
> line 1, can I perhaps use that information to make flatMap on line 2 decide
> when to emit its T-Digest? The crudest solution is to emit the T-Digest for
> a window when the first event of the next window arrives (will this cause
> problems with back-pressure?)
> Less crude, maybe I can store watermark information or something on
> metrics objects in line 1 and emit T digests more often in line 2?
>
> Finally, how do I access the watermark/window information in my fold
> operation in line 1?
>
> Thanks!
>
>
> ----- Original Message -----
> From:
> "Gyula Fóra" <gyula.f...@gmail.com>
>
> To:
> "William Saar" <will...@saar.se>, <user@flink.apache.org>
> Cc:
>
> Sent:
> Tue, 30 May 2017 08:56:28 +0000
> Subject:
> Re: Porting batch percentile computation to streaming window
>
>
>
>
> Hi William,
>
> I think basically the feature you are looking for are side inputs which is
> not implemented yet but let me try to give a workaround that might work.
>
> If I understand correctly you have two windowed computations:
> TDigestStream = allMetrics.windowAll(...).reduce()
> windowMetricsByIP = allMetrics.keyBy(ip).reduce()
>
> And now you want to join these two by window to compute the percentiles
> Something like:
>
>
> TDigestStream.broadcast().connect(windowMetricsByIP).flatMap(JoiningCoFlatMap)
>
> In your JoiningCoFlatMap you could keep a state of Map<Window, TDigest>
> and every by ip metric aggregate could pick up the TDigest for the current
> window. All this assumes that you attach the window information to the
> aggregate metrics and the TDigest (you can do this in the window reduce
> step).
>
> This logic now assumes that you get the TDigest result before getting any
> groupBy metric, which will probably not be the case so you could do some
> custom buffering in state. Depending on the rate of the stream this might
> or might not be feasible :)
>
> Does this sound reasonable? I hope I have understood the use-case
> correctly.
> Gyula
>
>
> William Saar <will...@saar.se> ezt írta (időpont: 2017. máj. 29., H,
> 18:34):
>
>> I am porting a calculation from Spark batches that uses broadcast
>> variables to compute percentiles from metrics and curious for tips on doing
>> this with Flink streaming.
>>
>> I have a windowed computation where I am compute metrics for IP-addresses
>> (a windowed stream of metrics objects grouped by IP-addresses). Now I would
>> like to compute percentiles for each IP from the metrics.
>>
>> My idea is to send all the metrics to a node that computes a global
>> TDigest and then rejoins the computed global TDigest with the IP-grouped
>> metrics stream to compute the percentiles for each IP. Is there a neat way
>> to implement this in Flink?
>>
>> I am curious about the best way to join a global valuem like our TDigest,
>> with every result of a grouped window stream.  Also how to know when the
>> TDigest is complete and has seen every element in the window (say if I
>> implement it in a stateful flatMap that emits the value after seeing all
>> stream values).
>>
>> Thanks!
>>
>> William
>>
>

Reply via email to