Hi,

There are some implementations to do that with low memory footprint. Have a
look at the count min sketch for example. There are some Java
implementations.

Christophe

2016-06-09 15:29 GMT+02:00 Yukun Guo <gyk....@gmail.com>:

> Thank you very much for the detailed answer. Now I understand a DataStream
> can be repartitioned or “joined” (don’t know the exact terminology) with
> keyBy.
>
> But another question:
> Despite the non-existence of incremental top-k algorithm, I’d like to
> incrementally compute the local word count during one hour, probably using
> a TreeMap for counting. As soon as the hour finishes, the TreeMap is
> converted to a stream of Tuple2 and forwarded to the remaining computation
> thereafter. I’m concerned about the memory usage: the TreeMap and the
> Tuple2 collection hold a huge amount of items, do I have to do some custom
> memory management?
>
> I’m also not sure whether a TreeMap is suitable here. This StackOverflow
> question presents a similar approach:
> http://stackoverflow.com/questions/34681887/how-apache-flink-deal-with-skewed-data,
> but the suggested solution seems rather complicated.
>
> On 8 June 2016 at 08:04, Jamie Grier <ja...@data-artisans.com> wrote:
>
>> Suggestions in-line below...
>>
>> On Mon, Jun 6, 2016 at 7:26 PM, Yukun Guo <gyk....@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I'm working on a project which uses Flink to compute hourly log
>>> statistics
>>> like top-K. The logs are fetched from Kafka by a FlinkKafkaProducer and
>>> packed
>>> into a DataStream.
>>>
>>> The problem is, I find the computation quite challenging to express with
>>> Flink's DataStream API:
>>>
>>> 1. If I use something like `logs.timeWindow(Time.hours(1))`, suppose
>>> that the
>>> data volume is really high, e.g., billions of logs might be generated in
>>> one
>>> hour, will the window grow too large and can't be handled efficiently?
>>>
>>
>> In the general case you can use:
>>
>>     stream
>>         .timeWindow(...)
>>         .apply(reduceFunction, windowFunction)
>>
>> which can take a ReduceFunction and a WindowFunction.  The ReduceFunction
>> is used to reduce the state on the fly and thereby keep the total state
>> size low.  This can commonly be used in analytics applications to reduce
>> the state size that you're accumulating for each window.  In the specific
>> case of TopK, however, you cannot do this if you want an exact result.  To
>> get an exact result I believe you have to actually keep around all of the
>> data and then calculate TopK at the end in your WindowFunction.  If you are
>> able to use approximate algorithms for your use case than you can calculate
>> a probabilistic incremental TopK based on some sort of sketch-based
>> algorithm.
>>
>>>
>>> 2. We have to create a `KeyedStream` before applying `timeWindow`.
>>> However,
>>> the distribution of some keys are skewed hence using them may compromise
>>> the performance due to unbalanced partition loads. (What I want is just
>>> rebalance the stream across all partitions.)
>>>
>>
>> A good and simple way to approach this may be to come up with a composite
>> key for your data that *is* uniformly distributed.  You can imagine
>> something simple like 'natural_key:random_number'.  Then keyBy(natural_key)
>> and reduce() again.  For example:
>>
>>     stream
>>         .keyBy(key, rand())      // partition by composite key that is
>> uniformly distributed
>>         .timeWindow(1 hour)
>>         .reduce()                     // pre-aggregation
>>         .keyBy(key)                // repartition
>>         .timeWindow(1 hour)
>>         .reduce()                     // final aggregation
>>
>>
>>>
>>> 3. The top-K algorithm can be straightforwardly implemented with
>>> `DataSet`'s
>>> `mapPartition` and `reduceGroup` API as in
>>> [FLINK-2549](https://github.com/apache/flink/pull/1161/), but not so
>>> easy if
>>> taking the DataStream approach, even with the stateful operators. I still
>>> cannot figure out how to reunion streams once they are partitioned.
>>>
>>>     I'm not sure I know what you're trying to do here.  What do you mean
>> by re-union?
>>
>>
>>> 4. Is it possible to convert a DataStream into a DataSet? If yes, how
>>> can I
>>> make Flink analyze the data incrementally rather than aggregating the
>>> logs for
>>> one hour before starting to process?
>>>
>>> There is no direct way to turn a DataStream into a DataSet.  I addressed
>> the point about doing the computation incrementally above, though.  You do
>> this with a ReduceFunction.  But again, there doesn't exist an exact
>> incremental TopK algorithm that I'm aware of.  This can be done with
>> sketching, though.
>>
>>
>> --
>>
>> Jamie Grier
>> data Artisans, Director of Applications Engineering
>> @jamiegrier <https://twitter.com/jamiegrier>
>> ja...@data-artisans.com
>>
>>
>

Reply via email to