Hi, I wanted to post something along the same lines but now I don't think the approach with local top-ks and merging works. For example, if you want to get top-4 and you do the pre-processing in two parallel instances. This input data would lead to incorrect results:
1. Instance: a 6 b 5 c 4 d 3 2. Instance: e 10 f 9 g 8 h 7 a 6 b 5 c 4 d 3 So each parallel instance would forward its local top-4, which would lead to the end result: e 10 f 9 g 8 h 7 Which is wrong. I think no matter how many elements you forward you can construct cases that lead to wrong results. (The problem seems to be that top-k is inherently global.) Might also be that I'm tired and not seeing this right... :D For the case where your elements are partitioned by some key you should be fine, though, as Gyula mentioned. I'm not familiar with the Spark API, maybe you can help me out. What does the updateStateByKey() do if your state is not actually partitioned by a key. Plus, I'm curious in general what Spark does with this call. Cheers, Aljoscha On Sun, 23 Aug 2015 at 22:28 Gyula Fóra <gyf...@apache.org> wrote: > Hey! > > What you are trying to do here is a global rolling aggregation, which is > inherently a DOP 1 operation. Your observation is correct that if you want > to use a simple stateful sink, you need to make sure that you set the > parallelism to 1 in order to get correct results. > > What you can do is to keep local top-ks in a parallel operator (let's say > a flatmap) and periodically output the local top-k elements and merge them > in a sink with parallelism=1 to produce a global top-k. > > I am not 100% sure how you implemented the same functionality in spark but > there you probably achieved the semantics I described above. > > The whole problem is much easier if you are interested in the top-k > elements grouped by some key, as then you can use partitioned operator > states which will give you the correct results with arbitrary parallelism. > > Cheers, > Gyula > > defstat <defs...@gmail.com> ezt írta (időpont: 2015. aug. 23., V, 21:40): > >> Hi. I am struggling the past few days to find a solution on the following >> problem, using Apache Flink: >> >> I have a stream of vectors, represented by files in a local folder. After >> a >> new text file is located using DataStream<String> text = >> env.readFileStream(...), I transform (flatMap), the Input into a >> SingleOutputStreamOperator<Tuple2<String, Integer>, ?>, with the >> Integer >> being the score coming from a scoring function. >> >> I want to persist a global HashMap containing the top-k vectors, using >> their >> scores. I approached the problem using a statefull transformation. >> 1. The first problem I have is that the HashMap retains per-sink data (so >> for each thread of workers, one HashMap of data). How can I make that a >> Global collection >> >> 2. Using Apache Spark, I made that possible by >> JavaPairDStream<String, Integer> stateDstream = >> tuples.updateStateByKey(updateFunction); >> >> and then making transformations on the stateDstream. Is there a way I can >> get the same functionality using FLink? >> >> Thanks in advance! >> >> >> >> -- >> View this message in context: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Statefull-computation-tp2494.html >> Sent from the Apache Flink User Mailing List archive. mailing list >> archive at Nabble.com. >> >