Hi, In the example the result is not correct because the values for a,b,c and d are never forwarded from instance 2 even though they would modify the global top-k result. It works, though, if you partition by the key field (tuple field 0, in this case) before doing the summation and local top-k. I think.
Best, Aljoscha On Sun, 23 Aug 2015 at 23:07 Gyula Fóra <gyula.f...@gmail.com> wrote: > Hey, > > I am not sure if I get it, why aren't the results correct? > > You don't instantly get the global top-k, but you are always updating it > with the new local results. > > Gyula > > Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2015. aug. 23., > V, 22:58): > >> Hi, >> I wanted to post something along the same lines but now I don't think the >> approach with local top-ks and merging works. For example, if you want to >> get top-4 and you do the pre-processing in two parallel instances. This >> input data would lead to incorrect results: >> >> 1. Instance: >> a 6 >> b 5 >> c 4 >> d 3 >> >> 2. Instance: >> e 10 >> f 9 >> g 8 >> h 7 >> a 6 >> b 5 >> c 4 >> d 3 >> >> So each parallel instance would forward its local top-4, which would lead >> to the end result: >> e 10 >> f 9 >> g 8 >> h 7 >> >> Which is wrong. I think no matter how many elements you forward you can >> construct cases that lead to wrong results. (The problem seems to be that >> top-k is inherently global.) >> >> Might also be that I'm tired and not seeing this right... :D >> >> For the case where your elements are partitioned by some key you should >> be fine, though, as Gyula mentioned. >> >> I'm not familiar with the Spark API, maybe you can help me out. What does >> the updateStateByKey() do if your state is not actually partitioned by a >> key. Plus, I'm curious in general what Spark does with this call. >> >> Cheers, >> Aljoscha >> >> On Sun, 23 Aug 2015 at 22:28 Gyula Fóra <gyf...@apache.org> wrote: >> >>> Hey! >>> >>> What you are trying to do here is a global rolling aggregation, which is >>> inherently a DOP 1 operation. Your observation is correct that if you want >>> to use a simple stateful sink, you need to make sure that you set the >>> parallelism to 1 in order to get correct results. >>> >>> What you can do is to keep local top-ks in a parallel operator (let's >>> say a flatmap) and periodically output the local top-k elements and merge >>> them in a sink with parallelism=1 to produce a global top-k. >>> >>> I am not 100% sure how you implemented the same functionality in spark >>> but there you probably achieved the semantics I described above. >>> >>> The whole problem is much easier if you are interested in the top-k >>> elements grouped by some key, as then you can use partitioned operator >>> states which will give you the correct results with arbitrary parallelism. >>> >>> Cheers, >>> Gyula >>> >>> defstat <defs...@gmail.com> ezt írta (időpont: 2015. aug. 23., V, >>> 21:40): >>> >>>> Hi. I am struggling the past few days to find a solution on the >>>> following >>>> problem, using Apache Flink: >>>> >>>> I have a stream of vectors, represented by files in a local folder. >>>> After a >>>> new text file is located using DataStream<String> text = >>>> env.readFileStream(...), I transform (flatMap), the Input into a >>>> SingleOutputStreamOperator<Tuple2<String, Integer>, ?>, with the >>>> Integer >>>> being the score coming from a scoring function. >>>> >>>> I want to persist a global HashMap containing the top-k vectors, using >>>> their >>>> scores. I approached the problem using a statefull transformation. >>>> 1. The first problem I have is that the HashMap retains per-sink data >>>> (so >>>> for each thread of workers, one HashMap of data). How can I make that a >>>> Global collection >>>> >>>> 2. Using Apache Spark, I made that possible by >>>> JavaPairDStream<String, Integer> stateDstream = >>>> tuples.updateStateByKey(updateFunction); >>>> >>>> and then making transformations on the stateDstream. Is there a way I >>>> can >>>> get the same functionality using FLink? >>>> >>>> Thanks in advance! >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Statefull-computation-tp2494.html >>>> Sent from the Apache Flink User Mailing List archive. mailing list >>>> archive at Nabble.com. >>>> >>>