Super. Thanks. I'll have to play around with it a bit and get back to you.
Kresten On 28/11/2011, at 16.17, "Bryan Fink" <br...@basho.com> wrote: > On Mon, Nov 21, 2011 at 2:46 PM, Kresten Krab Thorup <k...@trifork.com> wrote: >> I'd like to be able to do a "consistent map/reduce" job i.e., with "R=2 >> semantics" for an "N=3 bucket". Maybe other people have the same need, but >> I can't see if this is possible ... perhaps with the new riak_pipe >> infrastructure? > > Hi, Kresten. Indeed, this same topic came up in an independent > conversation last week. I think there are a few ways to attack it. > Let's start with yours: > >> The map function yields {Key, [{VectorClock,1,Hash}]} for each replica, but >> needs to run on *all* replicas of objects in a given Bucket. Hash is the >> real value I'm interested in i.e., the content-hash for the object; but it >> could be some other "map" function output. >> >> Then, the reduce phase needs to "merge" a list of {VectorClock,N,Hash} >> tuples, by considering the VectorClocks to determine if results are in >> "conflict", or if one is before/after the other. N is reduced to the sum of >> all elements with equal Hash value. > > I like many of the ideas in this approach. It has a nice distributed > data-provenance feel to it. I think the danger lies in the work that > the reduce phase would have to do. With distributed parallel > keylisting, there's no way to guarantee the order that the map results > arrive at the reduce processor. This means that the reduce state may > become quite large tracking all of the key/version pairs > produced-but-not-yet-satisfying-R. Maybe this is manageable, though, > so I'll also try to answer your questions: > >> - How can I have a M/R job run on *all* vnodes? Not just for objects that >> are owned by a primary? > > The only way to do this right now is to use Riak Pipe directly. Setup > a pipe with your "map" and "reduce" fittings, then send inputs to it > such that one input hashes to each vnode. Using riak_pipe_qcover_fsm > with N=1 might ease this process. > >> - The M/R "input" is essentially listkeys(Bucket) ... can this be done >> using "async keylisting", so that the operation does not hold up the vnode >> while listing? > > Yes, absolutely. The riak_kv_pipe_listkeys module does just this, and > is also an example of using riak_pipe_qcover_fsm. > > These two questions and answers lead to the basic pipe layout of: > > [ > {module= riak_kv_pipe_listkeys}, %% setup with qcover N=1 > > {module= riak_kv_pipe_get, > chashfun= follow}, %% each vnode processes keys it produces > > {module= riak_kv_mrc_map, > chashfun= follow}, > > {module= riak_kv_w_reduce, > chashfun= ContstantOrCustom} %% see below > ] > > Riak KV reduce fittings are normally set up with a constant chashfun, > such that *all* results are processed in one place. To help alleviate > the reduce-state-size problem, I might suggest using a chashfun that > spreads results, yet makes sure all results for each key end up at the > same reducer (most likely, hash the result's key, just as you would > for determining its KV preflist). > > Note also that the "get" fitting has a 'follow' chashfun. This is > also different from normal MR usage, since we would normally set N=3 > (or whatever the bucket has set) for the qcover-ing listkeys fitting. > The normal setting ensures that each key is produced only once, but > N=1 will produce the same key multiple times for buckets where N>1. > You want each result processed locally ('follow') to get the > possibly-different vclock/hash stored at that vnode. > > It may also be possible to take a completely different approach. A > simple modification of riak_kv_pipe_get could allow it to attempt to > read all N replicas, perhaps even by simply starting a > riak_kv_get_fsm. In this case, all of the merging of vclocks would > happen before the mapping instead of after. But, it would also miss > keys that were not fully replicated, since you'd likely want to > maintain N=3 for the keylisting qcover operation. I also haven't put > as much thought into this path, so there may be other demons lurking. > >> If someone can sketch a solution, I'd be happy to go hacking on it ... > > Hopefully that's enough sketching to at least generate a second round > of questions. ;) I'd be very interested in hearing how it goes. > Please fire back with anything that needs more explanation. > > -Bryan _______________________________________________ riak-users mailing list riak-users@lists.basho.com http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com