Hey, I am not sure if I get it, why aren't the results correct?
You don't instantly get the global top-k, but you are always updating it with the new local results. Gyula Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2015. aug. 23., V, 22:58): > Hi, > I wanted to post something along the same lines but now I don't think the > approach with local top-ks and merging works. For example, if you want to > get top-4 and you do the pre-processing in two parallel instances. This > input data would lead to incorrect results: > > 1. Instance: > a 6 > b 5 > c 4 > d 3 > > 2. Instance: > e 10 > f 9 > g 8 > h 7 > a 6 > b 5 > c 4 > d 3 > > So each parallel instance would forward its local top-4, which would lead > to the end result: > e 10 > f 9 > g 8 > h 7 > > Which is wrong. I think no matter how many elements you forward you can > construct cases that lead to wrong results. (The problem seems to be that > top-k is inherently global.) > > Might also be that I'm tired and not seeing this right... :D > > For the case where your elements are partitioned by some key you should be > fine, though, as Gyula mentioned. > > I'm not familiar with the Spark API, maybe you can help me out. What does > the updateStateByKey() do if your state is not actually partitioned by a > key. Plus, I'm curious in general what Spark does with this call. > > Cheers, > Aljoscha > > On Sun, 23 Aug 2015 at 22:28 Gyula Fóra <gyf...@apache.org> wrote: > >> Hey! >> >> What you are trying to do here is a global rolling aggregation, which is >> inherently a DOP 1 operation. Your observation is correct that if you want >> to use a simple stateful sink, you need to make sure that you set the >> parallelism to 1 in order to get correct results. >> >> What you can do is to keep local top-ks in a parallel operator (let's say >> a flatmap) and periodically output the local top-k elements and merge them >> in a sink with parallelism=1 to produce a global top-k. >> >> I am not 100% sure how you implemented the same functionality in spark >> but there you probably achieved the semantics I described above. >> >> The whole problem is much easier if you are interested in the top-k >> elements grouped by some key, as then you can use partitioned operator >> states which will give you the correct results with arbitrary parallelism. >> >> Cheers, >> Gyula >> >> defstat <defs...@gmail.com> ezt írta (időpont: 2015. aug. 23., V, 21:40): >> >>> Hi. I am struggling the past few days to find a solution on the following >>> problem, using Apache Flink: >>> >>> I have a stream of vectors, represented by files in a local folder. >>> After a >>> new text file is located using DataStream<String> text = >>> env.readFileStream(...), I transform (flatMap), the Input into a >>> SingleOutputStreamOperator<Tuple2<String, Integer>, ?>, with the >>> Integer >>> being the score coming from a scoring function. >>> >>> I want to persist a global HashMap containing the top-k vectors, using >>> their >>> scores. I approached the problem using a statefull transformation. >>> 1. The first problem I have is that the HashMap retains per-sink data (so >>> for each thread of workers, one HashMap of data). How can I make that a >>> Global collection >>> >>> 2. Using Apache Spark, I made that possible by >>> JavaPairDStream<String, Integer> stateDstream = >>> tuples.updateStateByKey(updateFunction); >>> >>> and then making transformations on the stateDstream. Is there a way I can >>> get the same functionality using FLink? >>> >>> Thanks in advance! >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Statefull-computation-tp2494.html >>> Sent from the Apache Flink User Mailing List archive. mailing list >>> archive at Nabble.com. >>> >>