I think you could actually do a window operation to get the tDigestStream from windowMetricsByIp:
windowMetricsByIp.allWindow(SameWindowAsTumblingTimeWindow).fold(...) This way the watermark mechanism should ensure you get all partial results before flushing the global window. Gyula William Saar <will...@saar.se> ezt írta (időpont: 2017. máj. 30., K, 15:03): > > This logic now assumes that you get the TDigest result before getting > any groupBy metric, which will probably not be the case so you could do > some custom buffering in state. Depending on the rate of the stream this > might or might not be feasible :) > > Unfortunately, I think this assumption is a deal-breaker. The value stream > is not grouped, but I need to distribute the values to compute the metrics > and I am populating the TDigest with the metrics > > Your suggestion gave me some ideas. Assume I have > windowMetricsByIp = > values.keyBy(ip).window(TumblingTimeWindow).fold(computeMetrics) > tDigestStream = windowMetricsByIp.global().flatMap(tDigestMapper) // How > do I know when the flat map has seen all values and should emit its result? > percentilesStream = > tDigestStream.broadcast().connect(windowMetricsByIp).flatMap > > If I attach information about the current window to the metrics events on > line 1, can I perhaps use that information to make flatMap on line 2 decide > when to emit its T-Digest? The crudest solution is to emit the T-Digest for > a window when the first event of the next window arrives (will this cause > problems with back-pressure?) > Less crude, maybe I can store watermark information or something on > metrics objects in line 1 and emit T digests more often in line 2? > > Finally, how do I access the watermark/window information in my fold > operation in line 1? > > Thanks! > > > ----- Original Message ----- > From: > "Gyula Fóra" <gyula.f...@gmail.com> > > To: > "William Saar" <will...@saar.se>, <user@flink.apache.org> > Cc: > > Sent: > Tue, 30 May 2017 08:56:28 +0000 > Subject: > Re: Porting batch percentile computation to streaming window > > > > > Hi William, > > I think basically the feature you are looking for are side inputs which is > not implemented yet but let me try to give a workaround that might work. > > If I understand correctly you have two windowed computations: > TDigestStream = allMetrics.windowAll(...).reduce() > windowMetricsByIP = allMetrics.keyBy(ip).reduce() > > And now you want to join these two by window to compute the percentiles > Something like: > > > TDigestStream.broadcast().connect(windowMetricsByIP).flatMap(JoiningCoFlatMap) > > In your JoiningCoFlatMap you could keep a state of Map<Window, TDigest> > and every by ip metric aggregate could pick up the TDigest for the current > window. All this assumes that you attach the window information to the > aggregate metrics and the TDigest (you can do this in the window reduce > step). > > This logic now assumes that you get the TDigest result before getting any > groupBy metric, which will probably not be the case so you could do some > custom buffering in state. Depending on the rate of the stream this might > or might not be feasible :) > > Does this sound reasonable? I hope I have understood the use-case > correctly. > Gyula > > > William Saar <will...@saar.se> ezt írta (időpont: 2017. máj. 29., H, > 18:34): > >> I am porting a calculation from Spark batches that uses broadcast >> variables to compute percentiles from metrics and curious for tips on doing >> this with Flink streaming. >> >> I have a windowed computation where I am compute metrics for IP-addresses >> (a windowed stream of metrics objects grouped by IP-addresses). Now I would >> like to compute percentiles for each IP from the metrics. >> >> My idea is to send all the metrics to a node that computes a global >> TDigest and then rejoins the computed global TDigest with the IP-grouped >> metrics stream to compute the percentiles for each IP. Is there a neat way >> to implement this in Flink? >> >> I am curious about the best way to join a global valuem like our TDigest, >> with every result of a grouped window stream. Also how to know when the >> TDigest is complete and has seen every element in the window (say if I >> implement it in a stateful flatMap that emits the value after seeing all >> stream values). >> >> Thanks! >> >> William >> >