Thanks!
On Mon, Jun 13, 2016 at 12:34 PM, Christophe Salperwyck <christophe.salperw...@gmail.com> wrote: > Hi, > I vote on this issue and I agree this would be nice to have. > > Thx! > Christophe > > 2016-06-13 12:26 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>: >> >> Hi, >> I'm afraid this is currently a shortcoming in the API. There is this open >> Jira issue to track it: https://issues.apache.org/jira/browse/FLINK-3869. We >> can't fix it before Flink 2.0, though, because we have to keep the API >> stable on the Flink 1.x release line. >> >> Cheers, >> Aljoscha >> >> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck >> <christophe.salperw...@gmail.com> wrote: >>> >>> Thanks for the feedback and sorry that I can't try all this straight >>> away. >>> >>> Is there a way to have a different function than: >>> WindowFunction<SummaryStatistics, SummaryStatistics, Tuple, TimeWindow>() >>> >>> I would like to return a HBase Put and not a SummaryStatistics. So >>> something like this: >>> WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>() >>> >>> Christophe >>> >>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: >>>> >>>> OK, this indicates that the operator following the source is a >>>> bottleneck. >>>> >>>> If that's the WindowOperator, it makes sense to try the refactoring of >>>> the WindowFunction. >>>> Alternatively, you can try to run that operator with a higher >>>> parallelism. >>>> >>>> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck >>>> <christophe.salperw...@gmail.com>: >>>>> >>>>> Hi Fabian, >>>>> >>>>> Thanks for the help, I will try that. The backpressure was on the >>>>> source (HBase). >>>>> >>>>> Christophe >>>>> >>>>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: >>>>>> >>>>>> Hi Christophe, >>>>>> >>>>>> where does the backpressure appear? In front of the sink operator or >>>>>> before the window operator? >>>>>> >>>>>> In any case, I think you can improve your WindowFunction if you >>>>>> convert parts of it into a FoldFunction<ANA, SummaryStatistics>. >>>>>> The FoldFunction would take care of the statistics computation and the >>>>>> WindowFunction would only assemble the result record including extracting >>>>>> the start time of the window. >>>>>> >>>>>> Then you could do: >>>>>> >>>>>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new >>>>>> YourWindowFunction()); >>>>>> >>>>>> This is more efficient because the FoldFunction is eagerly applied >>>>>> when ever a new element is added to a window. Hence, the window does only >>>>>> hold a single value (SummaryStatistics) instead of all element added to >>>>>> the >>>>>> window. In contrast the WindowFunction is called when the window is >>>>>> finally >>>>>> evaluated. >>>>>> >>>>>> Hope this helps, >>>>>> Fabian >>>>>> >>>>>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck >>>>>> <christophe.salperw...@gmail.com>: >>>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I am writing a program to read timeseries from HBase and do some >>>>>>> daily aggregations (Flink streaming). For now I am just computing some >>>>>>> average so not very consuming but my HBase read get slower and slower (I >>>>>>> have few billions of points to read). The back pressure is almost all >>>>>>> the >>>>>>> time close to 1. >>>>>>> >>>>>>> I use custom timestamp: >>>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>>>>>> >>>>>>> so I implemented a custom extractor based on: >>>>>>> AscendingTimestampExtractor >>>>>>> >>>>>>> At the beginning I have 5M reads/s and after 15 min I have just 1M >>>>>>> read/s then it get worse and worse. Even when I cancel the job, data are >>>>>>> still being written in HBase (I did a sink similar to the example - >>>>>>> with a >>>>>>> cache of 100s of HBase Puts to be a bit more efficient). >>>>>>> >>>>>>> When I don't put a sink it seems to stay on 1M reads/s. >>>>>>> >>>>>>> Do you have an idea why ? >>>>>>> >>>>>>> Here is a bit of code if needed: >>>>>>> final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0) >>>>>>> .assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor()) >>>>>>> .keyBy(0) >>>>>>> .timeWindow(Time.days(1)); >>>>>>> >>>>>>> final SingleOutputStreamOperator<Put> puts = ws.apply(new >>>>>>> WindowFunction<ANA, Put, Tuple, TimeWindow>() { >>>>>>> >>>>>>> @Override >>>>>>> public void apply(final Tuple key, final TimeWindow window, final >>>>>>> Iterable<ANA> input, >>>>>>> final Collector<Put> out) throws Exception { >>>>>>> >>>>>>> final SummaryStatistics summaryStatistics = new SummaryStatistics(); >>>>>>> for (final ANA ana : input) { >>>>>>> summaryStatistics.addValue(ana.getValue()); >>>>>>> } >>>>>>> final Put put = buildPut((String) key.getField(0), window.getStart(), >>>>>>> summaryStatistics); >>>>>>> out.collect(put); >>>>>>> } >>>>>>> }); >>>>>>> >>>>>>> And how I started Flink on YARN : >>>>>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 >>>>>>> -Dtaskmanager.network.numberOfBuffers=4096 >>>>>>> >>>>>>> Thanks for any feedback! >>>>>>> >>>>>>> Christophe >>>>>> >>>>>> >>>>> >>>> >>> >