Thanks!

On Mon, Jun 13, 2016 at 12:34 PM, Christophe Salperwyck
<christophe.salperw...@gmail.com> wrote:
> Hi,
> I vote on this issue and I agree this would be nice to have.
>
> Thx!
> Christophe
>
> 2016-06-13 12:26 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>:
>>
>> Hi,
>> I'm afraid this is currently a shortcoming in the API. There is this open
>> Jira issue to track it: https://issues.apache.org/jira/browse/FLINK-3869. We
>> can't fix it before Flink 2.0, though, because we have to keep the API
>> stable on the Flink 1.x release line.
>>
>> Cheers,
>> Aljoscha
>>
>> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck
>> <christophe.salperw...@gmail.com> wrote:
>>>
>>> Thanks for the feedback and sorry that I can't try all this straight
>>> away.
>>>
>>> Is there a way to have a different function than:
>>> WindowFunction<SummaryStatistics, SummaryStatistics, Tuple, TimeWindow>()
>>>
>>> I would like to return a HBase Put and not a SummaryStatistics. So
>>> something like this:
>>> WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>()
>>>
>>> Christophe
>>>
>>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>>>>
>>>> OK, this indicates that the operator following the source is a
>>>> bottleneck.
>>>>
>>>> If that's the WindowOperator, it makes sense to try the refactoring of
>>>> the WindowFunction.
>>>> Alternatively, you can try to run that operator with a higher
>>>> parallelism.
>>>>
>>>> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck
>>>> <christophe.salperw...@gmail.com>:
>>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> Thanks for the help, I will try that. The backpressure was on the
>>>>> source (HBase).
>>>>>
>>>>> Christophe
>>>>>
>>>>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>>>>>>
>>>>>> Hi Christophe,
>>>>>>
>>>>>> where does the backpressure appear? In front of the sink operator or
>>>>>> before the window operator?
>>>>>>
>>>>>> In any case, I think you can improve your WindowFunction if you
>>>>>> convert parts of it into a FoldFunction<ANA, SummaryStatistics>.
>>>>>> The FoldFunction would take care of the statistics computation and the
>>>>>> WindowFunction would only assemble the result record including extracting
>>>>>> the start time of the window.
>>>>>>
>>>>>> Then you could do:
>>>>>>
>>>>>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
>>>>>> YourWindowFunction());
>>>>>>
>>>>>> This is more efficient because the FoldFunction is eagerly applied
>>>>>> when ever a new element is added to a window. Hence, the window does only
>>>>>> hold a single value (SummaryStatistics) instead of all element added to 
>>>>>> the
>>>>>> window. In contrast the WindowFunction is called when the window is 
>>>>>> finally
>>>>>> evaluated.
>>>>>>
>>>>>> Hope this helps,
>>>>>> Fabian
>>>>>>
>>>>>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck
>>>>>> <christophe.salperw...@gmail.com>:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am writing a program to read timeseries from HBase and do some
>>>>>>> daily aggregations (Flink streaming). For now I am just computing some
>>>>>>> average so not very consuming but my HBase read get slower and slower (I
>>>>>>> have few billions of points to read). The back pressure is almost all 
>>>>>>> the
>>>>>>> time close to 1.
>>>>>>>
>>>>>>> I use custom timestamp:
>>>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>>>>
>>>>>>> so I implemented a custom extractor based on:
>>>>>>> AscendingTimestampExtractor
>>>>>>>
>>>>>>> At the beginning I have 5M reads/s and after 15 min I have just 1M
>>>>>>> read/s then it get worse and worse. Even when I cancel the job, data are
>>>>>>> still being written in HBase (I did a sink similar to the example - 
>>>>>>> with a
>>>>>>> cache of 100s of HBase Puts to be a bit more efficient).
>>>>>>>
>>>>>>> When I don't put a sink it seems to stay on 1M reads/s.
>>>>>>>
>>>>>>> Do you have an idea why ?
>>>>>>>
>>>>>>> Here is a bit of code if needed:
>>>>>>> final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0)
>>>>>>> .assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor())
>>>>>>> .keyBy(0)
>>>>>>> .timeWindow(Time.days(1));
>>>>>>>
>>>>>>> final SingleOutputStreamOperator<Put> puts = ws.apply(new
>>>>>>> WindowFunction<ANA, Put, Tuple, TimeWindow>() {
>>>>>>>
>>>>>>> @Override
>>>>>>> public void apply(final Tuple key, final TimeWindow window, final
>>>>>>> Iterable<ANA> input,
>>>>>>> final Collector<Put> out) throws Exception {
>>>>>>>
>>>>>>> final SummaryStatistics summaryStatistics = new SummaryStatistics();
>>>>>>> for (final ANA ana : input) {
>>>>>>> summaryStatistics.addValue(ana.getValue());
>>>>>>> }
>>>>>>> final Put put = buildPut((String) key.getField(0), window.getStart(),
>>>>>>> summaryStatistics);
>>>>>>> out.collect(put);
>>>>>>> }
>>>>>>> });
>>>>>>>
>>>>>>> And how I started Flink on YARN :
>>>>>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
>>>>>>> -Dtaskmanager.network.numberOfBuffers=4096
>>>>>>>
>>>>>>> Thanks for any feedback!
>>>>>>>
>>>>>>> Christophe
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Reply via email to