Riak's MapReduce functionality cannot survive a node failure. If a vnode 
involved with a query fails while actively processing the request, the entire 
query will have to be re-run. The failed query should be automatically 
terminated, but you'll have to re-run the query yourself.

If you create queries using Riak Pipe (the technology layer beneath MapReduce), 
it is possible to create queries that can survive a vnode failure, but that is 
not a trivial exercise.

Regarding the empty result set you're seeing: one possibility is that a vnode 
has failed recently and has come back online without data. MapReduce will not 
currently trigger a read repair, but that problem should be resolved with the 
forthcoming Riak 1.3 release.

-John Daily
Technical Evangelist
Basho

On Jan 30, 2013, at 8:05 AM, gu...@mail.mipt.ru wrote:

> We have 6 node riak cluster.I simple custom erlang application for custom 
> MapReduce job.
> 
> We start MapReduce job using riak_kv_mrc_pipe pipe module,for example - 
> 
> Query = [{map, {modfun,Mod,MapFun},[do_prereduce,{from,1}], false},{reduce, 
> {modfun,Mod,ReduceFun},[{reduce_phase_batch_size, 1000}], true}],
> riak_kv_mrc_pipe:mapred({index,Bucket,Field,From,To},Query,Timeout).
> 
> But if one of the node down for along time. Response is unpredictable 
> sometimes it's return {ok,GoodResultList}, but sometimes {ok,[]}(empty list).
> We trace riak_kv and riak_pipe and found too problem:
> 1. In riak_kv_pipe_index or in riak_kv_pipe_liskeys created fitting_spec this 
> nval always is 1.
> 2. Actual error is occurred in riak_pipe_vnode:remaining_preflist that retun 
> empty PrefList for some Hash(#fitting_spec.nval is 1). It use 
> riak_core_apl:get_primary_apl function.
> 
> But if we use old style map reduce,for example:
>        {ok,C} = riak:local_client(),
>        Me = self(),
>        Query = [{map, {modfun,Mod,MapFun},[do_prereduce,{from,1}], 
> false},{reduce, {modfun,Mod,ReduceFun},[{reduce_phase_batch_size, 1000}], 
> true}],
>       {ok, {ReqId,FlowPid}} = C:mapred_stream(Query,Me,Timeout),
>       
> {ok,_}=riak_kv_index_fsm_sup:start_index_fsm(zont_riak_connection:riak_node(),
>  [{raw, ReqId,FlowPid}, [Bucket, none,{range,Field,From,To},Timeout,mapred]]),
>       luke_flow:collect_output(ReqId, Timeout).
> 
> Query executed well. But problem is that do_prereduce and 
> {reduce_phase_batch_size, 1000} is ignored,that why execution is slow.
> 
> 
> Can you make some recommendation? May be riak_pipe_vnode:remaining_preflist 
> we need use riak_core_apl:get_apl_ann or set #fitting_spec.nval to nval from 
> out Bucket props?
> 
> _______________________________________________
> riak-users mailing list
> riak-users@lists.basho.com
> http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com


_______________________________________________
riak-users mailing list
riak-users@lists.basho.com
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com

Reply via email to