Hello Bryan.
I'm detect problem.

Problem is in reduce phase.

1. See riak_kv_mrc_pipe:mr2pipe_phases implementation. It convert MapReduce job 
spec to riak_pipe spec. 
In this fun created ConstHashCookie as  Now = now(), and use it as chashfun 
value for fitting  in reduce phase.
This generated value actually used in riak_kv_w_reduce:done function, you try 
make prereduce not reduced data and send to output.
But output vnode in that case is
preflist for ConstHashCookie,i.e. some random value and n_val for this phase is 
always 1, that why sometimes calculated perflist for this phase is empty.

Do you have any suggestion how we can fix it?

Thanks,
Alexander Gunin.

----- Исходное сообщение -----
От: gu...@mail.mipt.ru
Кому: "Bryan Fink" <br...@basho.com>
Копия: "Riak-Users" <riak-users@lists.basho.com>
Отправленные: Четверг, 31 Январь 2013 г 17:34:34
Тема: Re: Differences between riak_client and riak_kv_mrc_pipe MapReduce        
when one node is down.

Thank your for response.
1. Riak 1.2. I'm clone it form github master branch some times ago.
2. 6 nodes in out test environment.
3. more than 100 millions.(100 thousands per day)
4. near 100 thousands (index by date).

I'm prepare simple test module and test scenario. It must help you to help me:)

1. Generate four riak node.
[xx riak]$ make devrel
2. Start all nodes and join it to cluster.
[xx dev]$ dev1/bin/riak-admin ringready
TRUE All nodes agree on the ring ['dev1@127.0.0.1','dev2@127.0.0.1',
                                  'dev3@127.0.0.1','dev4@127.0.0.1']

3. Kill fourth node.
[xx dev]$ dev4/bin/riak stop

[xx dev]$ dev1/bin/riak-admin ringready
FALSE ['dev4@127.0.0.1'] down.  All nodes need to be up to check.

4. Compile my test module and include it in code path on all started nodes(I'm 
include it in riak_kv before compiling riak).

5. Join to erlang console on node dev1.
[xx dev]$ dev1/bin/riak attach

6. Test

Generate test dataset (50 objects).
(dev1@127.0.0.1)1> mapred_test:make_data(50).
ok

Check that all data available.
(dev1@127.0.0.1)7> mapred_test:check_data(50).
{not_available,0}
Ok. All data saved.
Check that all data available 50 times.
(dev1@127.0.0.1)8> mapred_test:check_data_n(50,50).
ok
Ok. All data really saved.

Try Count object count this MapReduce.
(dev1@127.0.0.1)23> mapred_test:pipe_mapreduce_check(50).
{not_available,0}
Ok. 
....Repeat 7 times.
Attempts #8.
(dev1@127.0.0.1)23> mapred_test:pipe_mapreduce_check(50).
{not_available,50}
!!!Fail!!!MapReduce actualy return {ok,[]}.

Try generate some statistics. Repeat MapReduce task 50 times.
(dev1@127.0.0.1)29> mapred_test:pipe_mapreduce_check_n(50,50).
ok count 39
failed count 11.

Every 1 of 5 task failed.

Code of mapred_test module:

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

-module(mapred_test).


-export([map/3,
                 reduce/2,
                 make_data/1,
                 check_data_n/2,
                 check_data/1,
                 pipe_mapreduce_check/1,
                 pipe_mapreduce_check_n/2]).

%%map for counter
map({error,notfound},_,_)->
        [];
map(_,_,_)->
        [1].

%%simle sum.
reduce(L,_)->
        [lists:foldl(fun(I,Acc)->I+Acc end,0,L)].

%%Generate data set for test.
make_data(Count)->
        {ok,Conn} = riak:local_client(),
        make_data_loop(Count,Conn).
make_data_loop(0,_)->
        ok;
make_data_loop(Count,Conn)->
        Key = list_to_binary(integer_to_list(Count)),
        RObj = riak_object:new(<<"mapred_test">>,Key,1),
        case Conn:put(RObj) of
                ok->
                        make_data_loop(Count-1,Conn);
                Else->
                        {error,Else}
        end.

        
check_data(Count)->
        {ok,Conn} = riak:local_client(),
        {not_available,check_data_loop(Count,Conn,0)}.

check_data_loop(0,_Conn,Acc)->
        Acc;
check_data_loop(Count,Conn,Acc)->
        Key = list_to_binary(integer_to_list(Count)),
        case Conn:get(<<"mapred_test">>,Key) of
                {ok,_}->
                        check_data_loop(Count-1,Conn,Acc);
                _->
                        check_data_loop(Count-1,Conn,Acc+1)
        end.

check_data_n(_Count,0)->
        ok;
check_data_n(Count,N)->
        case check_data(Count) of
                {not_available,0}->
                        check_data_n(Count,N-1);
                Else->
                        Else
        end.

pipe_mapreduce_check(Count)->
        Query = [{map, {modfun,?MODULE,map},[do_prereduce,none],false},
                         {reduce, 
{modfun,?MODULE,reduce},[{reduce_phase_batch_size, 1000}], true}],
        case riak_kv_mrc_pipe:mapred(<<"mapred_test">>,Query,60000) of
                {ok,[I]} when is_integer(I)->
                        {not_available,Count-I};
                {ok,[]}->
                        {not_available,Count};
                Else->
                        Else
        end.

pipe_mapreduce_check_n(Count,N)->
        {Good,Bad}=pipe_mapreduce_check_n(Count,N,{0,0}),
        io:format("ok count ~p ~nfailed count ~p~n",[Good,Bad]).

pipe_mapreduce_check_n(_Count,0,Acc)->
        Acc;
pipe_mapreduce_check_n(Count,N,{Good,Bad})->
        case pipe_mapreduce_check(Count) of
                {not_available,0}->
                        pipe_mapreduce_check_n(Count,N-1,{Good+1,Bad});
                {not_available,_}->
                        pipe_mapreduce_check_n(Count,N-1,{Good,Bad+1});
                Else->
                        {0,{runtime_error,Else}}
        end.


%%%%%%%%%%%%%%%%%%%%%%

Thanks,
Alexander Gunin.

----- Исходное сообщение -----
От: "Bryan Fink" <br...@basho.com>
Кому: gu...@mail.mipt.ru
Копия: "John Daily" <jda...@basho.com>, "Riak-Users" 
<riak-users@lists.basho.com>
Отправленные: Четверг, 31 Январь 2013 г 17:03:09
Тема: Re: Differences between riak_client and riak_kv_mrc_pipe MapReduce when 
one node is down.

On Thu, Jan 31, 2013 at 6:07 AM,  <gu...@mail.mipt.ru> wrote:
> Sorry John. You don't understand my question.
> 1. One node(I mean physical(erlang) node) in cluster is down.
> 2. It was down when i'm start job,when perform job and after it. We power off 
> this node. It's under repair. But we don't remove this node from cluster.

Aha. Thank you for the clarification. Sorry for pushing John in the
wrong direction. Your new description leads me to think that the
problem is likely in the reduce phase (where we do, yes, use an nval
of 1, but also a constant hash that doesn't account for node
liveness).

As yet, I've been unable to reproduce exactly what you'r seeing,
though. I always get an error instead of an empty result. Answers to
some of these questions may help me:

1. What version of Riak are you running?
2. How many nodes do you have in the cluster?
3. About how many keys are in this bucket?
4. About how many keys do you expect to match the index query?

Thanks,
Bryan

_______________________________________________
riak-users mailing list
riak-users@lists.basho.com
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com

_______________________________________________
riak-users mailing list
riak-users@lists.basho.com
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com

Reply via email to