Hi, There are some implementations to do that with low memory footprint. Have a look at the count min sketch for example. There are some Java implementations.
Christophe 2016-06-09 15:29 GMT+02:00 Yukun Guo <gyk....@gmail.com>: > Thank you very much for the detailed answer. Now I understand a DataStream > can be repartitioned or “joined” (don’t know the exact terminology) with > keyBy. > > But another question: > Despite the non-existence of incremental top-k algorithm, I’d like to > incrementally compute the local word count during one hour, probably using > a TreeMap for counting. As soon as the hour finishes, the TreeMap is > converted to a stream of Tuple2 and forwarded to the remaining computation > thereafter. I’m concerned about the memory usage: the TreeMap and the > Tuple2 collection hold a huge amount of items, do I have to do some custom > memory management? > > I’m also not sure whether a TreeMap is suitable here. This StackOverflow > question presents a similar approach: > http://stackoverflow.com/questions/34681887/how-apache-flink-deal-with-skewed-data, > but the suggested solution seems rather complicated. > > On 8 June 2016 at 08:04, Jamie Grier <ja...@data-artisans.com> wrote: > >> Suggestions in-line below... >> >> On Mon, Jun 6, 2016 at 7:26 PM, Yukun Guo <gyk....@gmail.com> wrote: >> >>> Hi, >>> >>> I'm working on a project which uses Flink to compute hourly log >>> statistics >>> like top-K. The logs are fetched from Kafka by a FlinkKafkaProducer and >>> packed >>> into a DataStream. >>> >>> The problem is, I find the computation quite challenging to express with >>> Flink's DataStream API: >>> >>> 1. If I use something like `logs.timeWindow(Time.hours(1))`, suppose >>> that the >>> data volume is really high, e.g., billions of logs might be generated in >>> one >>> hour, will the window grow too large and can't be handled efficiently? >>> >> >> In the general case you can use: >> >> stream >> .timeWindow(...) >> .apply(reduceFunction, windowFunction) >> >> which can take a ReduceFunction and a WindowFunction. The ReduceFunction >> is used to reduce the state on the fly and thereby keep the total state >> size low. This can commonly be used in analytics applications to reduce >> the state size that you're accumulating for each window. In the specific >> case of TopK, however, you cannot do this if you want an exact result. To >> get an exact result I believe you have to actually keep around all of the >> data and then calculate TopK at the end in your WindowFunction. If you are >> able to use approximate algorithms for your use case than you can calculate >> a probabilistic incremental TopK based on some sort of sketch-based >> algorithm. >> >>> >>> 2. We have to create a `KeyedStream` before applying `timeWindow`. >>> However, >>> the distribution of some keys are skewed hence using them may compromise >>> the performance due to unbalanced partition loads. (What I want is just >>> rebalance the stream across all partitions.) >>> >> >> A good and simple way to approach this may be to come up with a composite >> key for your data that *is* uniformly distributed. You can imagine >> something simple like 'natural_key:random_number'. Then keyBy(natural_key) >> and reduce() again. For example: >> >> stream >> .keyBy(key, rand()) // partition by composite key that is >> uniformly distributed >> .timeWindow(1 hour) >> .reduce() // pre-aggregation >> .keyBy(key) // repartition >> .timeWindow(1 hour) >> .reduce() // final aggregation >> >> >>> >>> 3. The top-K algorithm can be straightforwardly implemented with >>> `DataSet`'s >>> `mapPartition` and `reduceGroup` API as in >>> [FLINK-2549](https://github.com/apache/flink/pull/1161/), but not so >>> easy if >>> taking the DataStream approach, even with the stateful operators. I still >>> cannot figure out how to reunion streams once they are partitioned. >>> >>> I'm not sure I know what you're trying to do here. What do you mean >> by re-union? >> >> >>> 4. Is it possible to convert a DataStream into a DataSet? If yes, how >>> can I >>> make Flink analyze the data incrementally rather than aggregating the >>> logs for >>> one hour before starting to process? >>> >>> There is no direct way to turn a DataStream into a DataSet. I addressed >> the point about doing the computation incrementally above, though. You do >> this with a ReduceFunction. But again, there doesn't exist an exact >> incremental TopK algorithm that I'm aware of. This can be done with >> sketching, though. >> >> >> -- >> >> Jamie Grier >> data Artisans, Director of Applications Engineering >> @jamiegrier <https://twitter.com/jamiegrier> >> ja...@data-artisans.com >> >> >