We have 6 node riak cluster.I simple custom erlang application for custom 
MapReduce job.

We start MapReduce job using riak_kv_mrc_pipe pipe module,for example - 

Query = [{map, {modfun,Mod,MapFun},[do_prereduce,{from,1}], false},{reduce, 
{modfun,Mod,ReduceFun},[{reduce_phase_batch_size, 1000}], true}],
riak_kv_mrc_pipe:mapred({index,Bucket,Field,From,To},Query,Timeout).

But if one of the node down for along time. Response is unpredictable sometimes 
it's return {ok,GoodResultList}, but sometimes {ok,[]}(empty list).
We trace riak_kv and riak_pipe and found too problem:
1. In riak_kv_pipe_index or in riak_kv_pipe_liskeys created fitting_spec this 
nval always is 1.
2. Actual error is occurred in riak_pipe_vnode:remaining_preflist that retun 
empty PrefList for some Hash(#fitting_spec.nval is 1). It use 
riak_core_apl:get_primary_apl function.

But if we use old style map reduce,for example:
        {ok,C} = riak:local_client(),
         Me = self(),
        Query = [{map, {modfun,Mod,MapFun},[do_prereduce,{from,1}], 
false},{reduce, {modfun,Mod,ReduceFun},[{reduce_phase_batch_size, 1000}], 
true}],
        {ok, {ReqId,FlowPid}} = C:mapred_stream(Query,Me,Timeout),
        
{ok,_}=riak_kv_index_fsm_sup:start_index_fsm(zont_riak_connection:riak_node(), 
[{raw, ReqId,FlowPid}, [Bucket, none,{range,Field,From,To},Timeout,mapred]]),
        luke_flow:collect_output(ReqId, Timeout).

Query executed well. But problem is that do_prereduce and 
{reduce_phase_batch_size, 1000} is ignored,that why execution is slow.


Can you make some recommendation? May be riak_pipe_vnode:remaining_preflist we 
need use riak_core_apl:get_apl_ann or set #fitting_spec.nval to nval from out 
Bucket props?

_______________________________________________
riak-users mailing list
riak-users@lists.basho.com
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com

Reply via email to