Hi, Okay, than I understood correctly.
My point was something different. I never said that the approach I suggested will produce identical results to the continuos DOP 1 top-k, because thats impossible to parallelize. What I suggested is to apply batch (or window) updates which would periodically give you the "current" top-k (so some updates will be overwritten before being sent to the output). If this is feasible or not, depends on the application, but it should probably be fine. Cheers, Gyula On Mon, Aug 24, 2015 at 8:46 AM Aljoscha Krettek <aljos...@apache.org> wrote: > > Hi, > In the example the result is not correct because the values for a,b,c and > d are never forwarded from instance 2 even though they would modify the > global top-k result. It works, though, if you partition by the key field > (tuple field 0, in this case) before doing the summation and local top-k. I > think. > > Best, > Aljoscha > > On Sun, 23 Aug 2015 at 23:07 Gyula Fóra <gyula.f...@gmail.com> wrote: > >> Hey, >> >> I am not sure if I get it, why aren't the results correct? >> >> You don't instantly get the global top-k, but you are always updating it >> with the new local results. >> >> Gyula >> >> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2015. aug. >> 23., V, 22:58): >> >>> Hi, >>> I wanted to post something along the same lines but now I don't think >>> the approach with local top-ks and merging works. For example, if you want >>> to get top-4 and you do the pre-processing in two parallel instances. This >>> input data would lead to incorrect results: >>> >>> 1. Instance: >>> a 6 >>> b 5 >>> c 4 >>> d 3 >>> >>> 2. Instance: >>> e 10 >>> f 9 >>> g 8 >>> h 7 >>> a 6 >>> b 5 >>> c 4 >>> d 3 >>> >>> So each parallel instance would forward its local top-4, which would >>> lead to the end result: >>> e 10 >>> f 9 >>> g 8 >>> h 7 >>> >>> Which is wrong. I think no matter how many elements you forward you can >>> construct cases that lead to wrong results. (The problem seems to be that >>> top-k is inherently global.) >>> >>> Might also be that I'm tired and not seeing this right... :D >>> >>> For the case where your elements are partitioned by some key you should >>> be fine, though, as Gyula mentioned. >>> >>> I'm not familiar with the Spark API, maybe you can help me out. What >>> does the updateStateByKey() do if your state is not actually partitioned by >>> a key. Plus, I'm curious in general what Spark does with this call. >>> >>> Cheers, >>> Aljoscha >>> >>> On Sun, 23 Aug 2015 at 22:28 Gyula Fóra <gyf...@apache.org> wrote: >>> >>>> Hey! >>>> >>>> What you are trying to do here is a global rolling aggregation, which >>>> is inherently a DOP 1 operation. Your observation is correct that if you >>>> want to use a simple stateful sink, you need to make sure that you set the >>>> parallelism to 1 in order to get correct results. >>>> >>>> What you can do is to keep local top-ks in a parallel operator (let's >>>> say a flatmap) and periodically output the local top-k elements and merge >>>> them in a sink with parallelism=1 to produce a global top-k. >>>> >>>> I am not 100% sure how you implemented the same functionality in spark >>>> but there you probably achieved the semantics I described above. >>>> >>>> The whole problem is much easier if you are interested in the top-k >>>> elements grouped by some key, as then you can use partitioned operator >>>> states which will give you the correct results with arbitrary parallelism. >>>> >>>> Cheers, >>>> Gyula >>>> >>>> defstat <defs...@gmail.com> ezt írta (időpont: 2015. aug. 23., V, >>>> 21:40): >>>> >>>>> Hi. I am struggling the past few days to find a solution on the >>>>> following >>>>> problem, using Apache Flink: >>>>> >>>>> I have a stream of vectors, represented by files in a local folder. >>>>> After a >>>>> new text file is located using DataStream<String> text = >>>>> env.readFileStream(...), I transform (flatMap), the Input into a >>>>> SingleOutputStreamOperator<Tuple2<String, Integer>, ?>, with the >>>>> Integer >>>>> being the score coming from a scoring function. >>>>> >>>>> I want to persist a global HashMap containing the top-k vectors, using >>>>> their >>>>> scores. I approached the problem using a statefull transformation. >>>>> 1. The first problem I have is that the HashMap retains per-sink data >>>>> (so >>>>> for each thread of workers, one HashMap of data). How can I make that a >>>>> Global collection >>>>> >>>>> 2. Using Apache Spark, I made that possible by >>>>> JavaPairDStream<String, Integer> stateDstream = >>>>> tuples.updateStateByKey(updateFunction); >>>>> >>>>> and then making transformations on the stateDstream. Is there a way I >>>>> can >>>>> get the same functionality using FLink? >>>>> >>>>> Thanks in advance! >>>>> >>>>> >>>>> >>>>> -- >>>>> View this message in context: >>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Statefull-computation-tp2494.html >>>>> Sent from the Apache Flink User Mailing List archive. mailing list >>>>> archive at Nabble.com. >>>>> >>>>