Thank you very much for the detailed answer. Now I understand a DataStream can be repartitioned or “joined” (don’t know the exact terminology) with keyBy.
But another question: Despite the non-existence of incremental top-k algorithm, I’d like to incrementally compute the local word count during one hour, probably using a TreeMap for counting. As soon as the hour finishes, the TreeMap is converted to a stream of Tuple2 and forwarded to the remaining computation thereafter. I’m concerned about the memory usage: the TreeMap and the Tuple2 collection hold a huge amount of items, do I have to do some custom memory management? I’m also not sure whether a TreeMap is suitable here. This StackOverflow question presents a similar approach: http://stackoverflow.com/questions/34681887/how-apache-flink-deal-with-skewed-data, but the suggested solution seems rather complicated. On 8 June 2016 at 08:04, Jamie Grier <ja...@data-artisans.com> wrote: > Suggestions in-line below... > > On Mon, Jun 6, 2016 at 7:26 PM, Yukun Guo <gyk....@gmail.com> wrote: > >> Hi, >> >> I'm working on a project which uses Flink to compute hourly log statistics >> like top-K. The logs are fetched from Kafka by a FlinkKafkaProducer and >> packed >> into a DataStream. >> >> The problem is, I find the computation quite challenging to express with >> Flink's DataStream API: >> >> 1. If I use something like `logs.timeWindow(Time.hours(1))`, suppose that >> the >> data volume is really high, e.g., billions of logs might be generated in >> one >> hour, will the window grow too large and can't be handled efficiently? >> > > In the general case you can use: > > stream > .timeWindow(...) > .apply(reduceFunction, windowFunction) > > which can take a ReduceFunction and a WindowFunction. The ReduceFunction > is used to reduce the state on the fly and thereby keep the total state > size low. This can commonly be used in analytics applications to reduce > the state size that you're accumulating for each window. In the specific > case of TopK, however, you cannot do this if you want an exact result. To > get an exact result I believe you have to actually keep around all of the > data and then calculate TopK at the end in your WindowFunction. If you are > able to use approximate algorithms for your use case than you can calculate > a probabilistic incremental TopK based on some sort of sketch-based > algorithm. > >> >> 2. We have to create a `KeyedStream` before applying `timeWindow`. >> However, >> the distribution of some keys are skewed hence using them may compromise >> the performance due to unbalanced partition loads. (What I want is just >> rebalance the stream across all partitions.) >> > > A good and simple way to approach this may be to come up with a composite > key for your data that *is* uniformly distributed. You can imagine > something simple like 'natural_key:random_number'. Then keyBy(natural_key) > and reduce() again. For example: > > stream > .keyBy(key, rand()) // partition by composite key that is > uniformly distributed > .timeWindow(1 hour) > .reduce() // pre-aggregation > .keyBy(key) // repartition > .timeWindow(1 hour) > .reduce() // final aggregation > > >> >> 3. The top-K algorithm can be straightforwardly implemented with >> `DataSet`'s >> `mapPartition` and `reduceGroup` API as in >> [FLINK-2549](https://github.com/apache/flink/pull/1161/), but not so >> easy if >> taking the DataStream approach, even with the stateful operators. I still >> cannot figure out how to reunion streams once they are partitioned. >> >> I'm not sure I know what you're trying to do here. What do you mean > by re-union? > > >> 4. Is it possible to convert a DataStream into a DataSet? If yes, how can >> I >> make Flink analyze the data incrementally rather than aggregating the >> logs for >> one hour before starting to process? >> >> There is no direct way to turn a DataStream into a DataSet. I addressed > the point about doing the computation incrementally above, though. You do > this with a ReduceFunction. But again, there doesn't exist an exact > incremental TopK algorithm that I'm aware of. This can be done with > sketching, though. > > > -- > > Jamie Grier > data Artisans, Director of Applications Engineering > @jamiegrier <https://twitter.com/jamiegrier> > ja...@data-artisans.com > >